接着上一节提出的问题,我们来实现一个叫做 BufferSubject
的类型,一方面,它发送事件的行为类似 PassthroughSubject
,另一方面,它还支持在多个订阅者之间缓存事件。整体的实现套路,和之前实现 CustomSubject
基本上是一样的,分成四个部分:
- 首先,实现表达缓存行为的类
Buffer
; - 其次,为
BufferSubject
定义它自己的SubscriptionBehavior
; - 第三,为
BufferSubject
实现Subject
约束的方法; - 第四,用单元测试验证预期的结果;
这一节,我们先来完成前两部分。
定义 Buffer
在项目目录中,新建一个 BufferSubject.swift。在这里,创建一个表达缓存行为的类型:
public struct Buffer<Output, Failure: Error> {
var values: [Output] = []
var completion: Subscribers.Completion<Failure>? = nil
let limit: Int
let strategy: Publishers.BufferingStrategy<Failure>
public var isEmpty: Bool {
return values.isEmpty && completion == nil
}
}
其中:
values
表示缓存的普通事件数组;completion
表示缓存的完成事件。由于Buffer
只应该缓存一个完成事件,因此,我们把values
和completion
分开定义,而没有统一定义成一个Event<Output, Failure>
数组;limit
是缓存的上限,它应该是一个大于等于 0 的整数;strategy
是 Combine 提供的缓存策略,我们一会再说;isEmpty
用于判断缓存是否为空,它的依据除了value
为空之外,还要判断completion
是否为nil
;
接下来,就是它的支持的接口了。我们先来看向缓存中添加事件的 push
:
public mutating func push(_ value: Output) {
guard completion == nil else { return }
guard values.count < limit else {
switch strategy {
case .dropNewest:
values.removeLast()
values.append(value)
case .dropOldest:
values.removeFirst()
values.append(value)
case .customError(let errFn):
completion = .failure(errFn())
@unknown default:
fatalError()
}
return
}
values.append(value)
}
如果 completion
不为 nil
说明事件序列已经结束了,这时就不应该再缓存任何内容,直接返回。否则,判断下当前是否还可以缓存事件,如果可以,就直接把事件加入 values
末尾。否则,就说明缓存已经满了,这时,根据 strategy
支持的三种策略,我们要分别定义对应的行为:
.dropNewest
表示放弃掉队列中最新的,因此,我们就用removeLast
去掉values
中最后一个,再把新事件放到数组末尾;.dropOldest
表示放弃掉队列中最旧的,因此,我们就用removeFirst
去掉values
中第一个,再把新事件放到数组末尾;.customError
表示用户提供了一个当发生这种情况时生成错误的方法,我们就用这个方法封装一个Completion<Failure>
事件,并把它作为缓冲区的完成事件;
在 Swift 5 之后,由于 BufferingStrategy<Failure>
是一个 non-frozen enum,为了更好的处理未来新增的缓存策略,我们使用了 @unknown default
处理未知的情况。它和 default
唯一的区别就是,@unknown default
在我们没有使用 case
过完所有已知的情况之前,会显示一个警告信息:

但如果只用 default
处理默认情况的话,就没有这个“福利”了。完成后,我们来看向 Buffer
中添加完成事件的方法:
public mutating func push(
completion: Subscribers.Completion<Failure>) {
guard self.completion == nil else { return }
self.completion = completion
}
它的实现就简单多了,只要 Buffer
中还没有完成事件,就设置 self.completion
。最后,是从 Buffer
中取出事件的方法 fetch
:
public mutating func fetch()
-> Subscribers.Event<Output, Failure>? {
if values.count > 0 {
return .value(values.removeFirst())
}
else if let completion = self.completion {
values = []
self.completion = nil
return .complete(completion)
}
return nil
}
如果缓冲区中还有事件,我们就把它包装在 Event.value
里返回。这里要注意的是,取出事件的顺序是从 values
的头元素开始的,因为向新的订阅者“回放”历史事件也要按照它们加入 Buffer
的顺序执行。
如果 values
空了,就检查 completion
,如果不为 nil
,就把 values
清空,completion
设置成 nil
,并把之前缓存的完成事件封装成 Event.completion
返回。
最后,如果不是上述两种情况,Buffer
就没有任何内容可提供,我们就直接返回 nil
。至此,这个提供事件缓存功能的 Buffer
就定义好了。
自定义 SubscriptionBehavior
接下来,我们基于 Buffer
实现 SubscriptionBehavior
。继续在 BufferSubject.swift 里添加下面的代码:
public class BufferSubject<Output, Failure: Error>: Subject {
class Behavior: SubscriptionBehavior {
typealias Input = Output
var upstream: Subscription? = nil
let downstream: AnySubscriber<Input, Failure>
let subject: BufferSubject<Output, Failure>
var demand = Subscribers.Demand.none
var buffer: Buffer<Output, Failure>
init(subject: BufferSubject<Output, Failure>,
downstream: AnySubscriber<Input, Failure>,
buffer: Buffer<Output, Failure>){
self.subject = subject
self.downstream = downstream
self.buffer = buffer
}
}
/// ...
}
这里,BufferSubject
是下一节我们最终要实现的具备缓存功能的 Publisher
。在 Behavior
的定义里:
- 首先,缓存功能并不对事件值进行类型转换,因此,
Input
和Ouput
的类型仍旧是相同的; - 其次,在这个例子中,我们用不到
upstream
,直接把它设置成nil
就好; - 第三,
downstream
是下游订阅者的引用; - 第四,
subject
是这个Behavior
服务的Subject
对象,也就是我们将要定义的BufferSubject
; - 第五,是可订阅的事件数量;
- 最后,
buffer
是要执行的缓存策略;
了解了它的属性之后,我们先来看 Behavior
实现的接口。首先,是 request(_ d: Subscribers.Demand)
:
func request(_ d: Subscribers.Demand) {
demand += d
while demand > 0, let next = buffer.fetch() {
demand -= 1
switch next {
case .value(let value):
let newDemand = downstream.receive(value)
demand += newDemand
case .complete(let completion):
downstream.receive(completion: completion)
}
}
}
它的逻辑,就是只要缓冲区中还有事件,并且数量没超过订阅者发起的订阅请求数量,就不断向下游的订阅者发送缓存的事件。如果普通事件发送完了,就发送完成事件。通过它的实现,我们可以进一步理解 demand
的计算方法,它会根据每一次订阅者返回的 receive
方法的返回值进行调整。
其次,是 Behavior
自身作为订阅者,接收到普通事件的 receive
方法:
func receive(_ input: Output) -> Subscribers.Demand {
if demand > 0 && buffer.isEmpty {
let newDemand = downstream.receive(input)
demand = newDemand + demand - 1
}
else {
buffer.push(input)
}
return .unlimited
}
在它的实现里,如果,demand
大于 0,并且当前没有缓存任何事件。那我们也无需缓存收到的 input
事件,直接把它发送给下游订阅者,并更新 demand
就好了。否则,就表示我们还有历史事件没有发送完,这时,就通过 push
把事件保存进 buffer
。另外,从它返回的 .unlimited
我们就知道,Behavior
会一直向上游 Publisher
订阅事件。
第三,是 Behavior
自身作为订阅者,接收到完成事件的 receive
方法。它的实现则简单一些,如果缓存为空就直接向下游订阅者发送完成事件,否则就让这个事件进缓存:
func receive(completion: Subscribers.Completion<Failure>) {
if buffer.isEmpty {
downstream.receive(completion: completion)
}
else {
buffer.push(completion: completion)
}
}
最后,是执行取消订阅的 cancel
方法:
func cancel() {
subject.subscribers.mutate {
$0.removeValue(forKey: self.combineIdentifier)
}
}
这和之前我们实现 CustomSubject
时实现的 cancel
是一样的,通过 self.combineIdentifier
,从 subscribers
中删掉申请发起 cancel
请求的订阅者就好了。
What's next?
至此,这个为 BufferSubject
服务的 Behavior
就完成了。现在,稍微休息一会儿,下一节,基于这个实现,我们完成 BufferSubject
的定义,并编写单元测试来验证它的功能。