继续之前的工作,这一节,我们自定义一个 Publisher
,并把它用于之前的单元测试,如果一切顺利,测试应该可以通过,这样,我们对 Combine 中事件发布和订阅模型的理解,也就得到了验证。
自定义 Publisher
在之前的项目中,新建一个 CombineSubject.swift。在这里,为了方便稍后测试,我们创建一个类似 PassthroughSubject
行为的类:
public class CustomSubject<Output, Failure: Error>: Subject {
var subscribers: Dictionary<
CombineIdentifier, CustomSubscription<Behavior>> = [:]
}
由于 Subject
是一个从 Publisher
派生而来的协议,因此,我们就不用再声明 CustomSubject
实现 Publisher
了。CustomSubject
有一个 subscribers
属性,它用一个字典保存了所有的订阅者。其中,Key 是当前 Publisher
事件流的标识,Value 是封装了订阅者的 Subscription
对象。
定制 SubscriptionBehavior
那么,上面代码中的 Behavior
是什么呢?按照上一节的描述,它定义的是把 CustomSubject
的订阅者封装成 Subscription
时完成的操作。这里唯一需要定制的,就是当有订阅者取消订阅时,我们要从 CustomSubject.subscribers
删掉对应的 CustomSubscription
对象。可能光看文字有点晕,我们直接来看代码:
public class CustomSubject<Output, Failure: Error>: Subject {
class Behavior: SubscriptionBehavior {
// 1
typealias Input = Output
// 2
var demand: Subscribers.Demand = .none
var upstream: Subscription? = nil
let downstream: AnySubscriber<Output, Failure>
// 3
let subject: CustomSubject<Output, Failure>
init(subject: CustomSubject<Output, Failure>,
downstream: AnySubscriber<Output, Failure>) {
self.subject = subject
self.downstream = downstream
}
// 4
func request(_ d: Subscribers.Demand) {
demand += d
upstream?.request(d)
}
// 5
func cancel() {
subject.subscribers.removeValue(forKey: combineIdentifier)
}
}
}
按照代码注释中数字的顺序,我们逐个看一下这些段代码。
第一部分,按照我们的想法,CustomSubject
不会对事件进行变化,因此,Input
和 Output
的类型是相同的,我们把 Output
定义成 Input
的别名,就不用再额外指定 Input
的类型了。
第二部分,是 SubscriptionBehavior
约束的属性。把它们定义成属性,也就有各自的 get
和 set
方法了。
第三部分,是 CustomSubject
对象的引用。通过这个属性,Subscription
才能完成修改 CustomSubject
的订阅需求,或者取消订阅的操作。
第四部分,按说,这个 request
方法是不需要在这里重新定义一遍的。因为它和 SubscriptionBehavior
中的默认实现一样。但在 Xcode 11.2.1 这个版本自带的 Swift 版本上,不定义这个方法会导致 Swift 编译器发生段错误,因此,它和我们要实现的逻辑没什么关系,纯粹是一个临时的解决方案。
第五部分,就是我们要为 CustomSubject
自定义 Behavior
的根本原因。不同于 SubscriptionBehavior
中的默认实现,这里,有订阅者要取消订阅时,我们要把它从 subject.subscribers
中删除。
实现 CustomSubject
定义好了 Behavior
之后,就可以实现 CustomSubject
了。我们先来实现 Publisher
的部分:
public func receive<S>(subscriber: S)
where S : Subscriber, Failure == S.Failure, Output == S.Input {
let behavior = Behavior(subject: self, downstream: AnySubscriber(subscriber))
let subscription = CustomSubscription(behavior: behavior)
subscribers[subscription.combineIdentifier] = subscription
subscription.receive(subscription: Subscriptions.empty)
}
通过这段代码,就能了解当 Publisher
被订阅的时候,究竟发生什么了:
- 首先,通过用
subscriber
创建Behavior
对象,我们就定义了如何向订阅者“递交”发生的普通和完成事件; - 其次,通过创建
CustomSubscription
对象,我们演示了如何用Subscription
封装下有订阅者; - 最后,调用
receive(subscription:)
方法。由于CustomSubject.upstream
被我们设置成了nil
,这里,我们无需再向上游事件发布者封装自己的Subscription
对象。因此,直接传递了Subscriptions.empty
。进而会导致订阅者的request(_ demand:)
被调用,从而完成整个订阅过程;
完成后,我们来实现 Subject
的部分,首先是一个我们不太常见的 send
方法:
public func send(subscription: Subscription) {
subscription.request(.unlimited)
}
坦白讲,我并不清楚这个方法究竟用在什么场景里,我们在之前推测的事件发布订阅模型中也没有提到它,Apple 在官方文档中对这个方法的描述也很模糊。大致的意思,感觉像和 Subscriber.receive(subscription)
是类似的。大家如果有对这个方法更确切的理解,欢迎写邮件给我。既然不了解它,我们就一直订阅 subscription
生成的事件就好了。
其次,是发送普通事件的 send
方法:
public func send(_ value: Output) {
for (_, sub) in subscribers {
_ = sub.receive(value)
}
}
这个实现很好理解,就是遍历 subscribers
数组,然后通过 receive
方法让订阅者们接收事件就好了。而这种形式,也就是在之前我们猜测的 Publisher
中的事件是“点对点”发送到订阅者的表现形式。
最后,是发送完成事件的 send
方法。大体上,和发送普通事件的 send
逻辑是一样的,只不过,给所有订阅者发送了完成事件之后,要清空 subscribers
,这样,就不会再向这些订阅者发送任何消息了:
public func send(completion: Subscribers.Completion<Failure>) {
for (_, sub) in subscribers {
_ = sub.receive(completion: completion)
}
subscribers.removeAll()
}
What's next?
至此,CustomSubject
也就完成了,它当前的问题,就是对 subscribers
的访问并不是线程安全的,因此,还不能在多线程环境里使用共享的 CustomSubject
对象。不过,这个问题并不影响当前的研究,为了保持精力集中,我们稍后再花点时间专门处理线程安全性的问题。
现在,稍微让自己休息一会儿,回顾一下这一节我们为 CustomSubject
自定义 Behavior
的目的,以及 CustomSubject
自身的实现是如何对应到 Combine 事件发布订阅模型的。下一节,我们将仿照这个模式,自定义 Publishers.Scan
的实现。这是在使用单元测试验证整个模型之前,我们要实现的最后一个 Combine 组件。