这一节,我们继续之前的话题。Subscription
究竟是什么呢?它又是如何影响 Combine 的事件发布订阅模型的呢?为了搞清楚这个问题,我们可以从两个方向入手。
- 首先,通过文档和已经公开的代码,整理和补充
Subscription
在事件发布订阅模型中承担的功能,当然这里也需要有一些脑补的环节; - 其次,用代码自定义一个
Subscription
对象并把它用于 Combine 的单元测试,验证我们的理解;
一个完整的订阅发布模型
如果直接去翻翻 Subscription
的代码,就会发现,它只约束了一个 request
方法:
public protocol Subscription
: Cancellable, CustomCombineIdentifierConvertible {
/// Tells a publisher that it may send more values to the subscriber.
func request(_ demand: Subscribers.Demand)
}
再想想我们上一节整理出来的内容,Subscriber
约束了一个接受 Subscription
参数的 receive
方法,而 Subscription
只约束了一个接受事件订阅数量的 request
方法。它们到底在事件发布订阅模型中,承担了什么角色呢?
为了搞清楚这个问题,在不考虑额外调度的前提下,我们先把一个完整的事件发布订阅模型陈列出来:

- 首先,订阅者调用发布者的
subscribe
方法,把自己作为参数传递给发布者表示要订阅事件; - 其次,在
subscribe
方法的实现里,会调用Publisher
约束的receive<S>(subscriber: S)
方法处理传入的订阅者; - 第三,在
receive
的实现里,会把传入的订阅者封装成一个Subscription
对象,这个对象会持有原始订阅者的引用; - 第四,如果被订阅的发布者存在上游发布者,它就会把封装的
Subscription
作为订阅者,继续调用上游发布者的subscribe
方法。为了让这一步成立,Subscription
也应该是一个实现了Subscriber
的类型; - 第五,上游发布者会把下游传入的
Subscription
对象进一步封装成表示订阅其自身的Subscription
对象,并让它持有指向传入的Subscription
对象的引用; - 第六,事件流源头的发布者,会调用下级订阅者的
receive<S>(subscription:)
方法,传入它封装好的Subscription
对象;
至此,整个发布订阅模型的前半部分就说完了。在整个事件流发布和订阅的链条里,只有最末端的订阅者,是通过 Subscriber
表示的,而其余的环节里,维持链条完整的“中间订阅者”都是通过 Subscription
表示的。尽管从公开的代码和文档中没有并没有说明 Subscription
是一个 Subscriber
,仅仅说了 Subscription
is a protocol representing the connection of a subscriber to a publisher。但从要实现的结果,以及 Publisher
和 Subscriber
各自的接口声明来看,要想维系这种关系,Subscription
的确需要有 Subscriber
的行为特征才行。
为了进一步巩固这个想法,我们继续来看发布订阅模型的后半部分,把这部分用一张图表示,是这样的:

这次,我们仅仅列出了原始的订阅者,直接订阅的 Publisher
封装的 Subscription
,以及上游 Publisher
封装的 Subscription
。这个过程紧接者刚才说过的“第六”。
- 第七,上游
Publisher
封装了自己的Subscription
对象之后,把下游Publisher
封装的Subscription
对象当成了一个订阅者,调用了它的receive<S>(subscription:)
方法; - 第八,下游
Publisher
再调用原始订阅者的receive<S>(subscription:)
,传递它自己的Subscription
对象; - 第九,在原始订阅者的
receive<S>(subscription:)
方法里,调用传入的Subscription
对象的request(_ demand:)
方法,表明自己期望订阅到的事件数量; - 最后,
Publisher
继续调用自己上游Publisher
传入的Subscription
的request(_ demand:)
方法,这样在整条事件传递链条上,最终到订阅者的,就只有特定数量的事件了;
以上,就是一个完整的 Combine 事件发送订阅模型。通过第二个图片可以看出,在 Combine 里,通过代码呈现的发布者和订阅者关系的背后,真正发挥作用的,其实是一个 Subscription
链条。说到这,也就不难理解为什么上一节最后一个测试用例中,sinkC
和 sinkD
不能共享 scanB
中的事件了,因为对于这两个订阅者来说,scanB
的上游事件并不相同,把它们的订阅关系转换成 Subscription
对象的关系是这样的:

自定义 Subscription 实现
看到这,你可能会想,虽然从道理上整个逻辑能说得通,但 Combine 中真就如此么?毕竟这里有我们根据文档和方法签名推测出来的东西。而验证这个推测最好的方式,就是自定义一个 Publisher
和 Subscription
对象,并把它们用在上一节的测试用例中,如果可以通过测试,关于整个模型的推测就得到印证了。
定义 Subscription
“应有”的行为
首先,我们要自定义一个协议,让它拥有之前我们推测的各种行为。为此,在上一节创建的 framework 项目里,新建个 CustomSubscription.swift。在这里,按照我们对 Subscription
“需求”定义一个协议:
public protocol SubscriptionBehavior
: class, Cancellable, CustomCombineIdentifierConvertible {
associatedtype Input
associatedtype Failure: Error
associatedtype Output
associatedtype OutputFailure: Error
var demand: Subscribers.Demand { get set }
var upstream: Subscription? { get set }
var downstream: AnySubscriber<Output, OutputFailure> { get }
func request(_ d: Subscribers.Demand)
func receive(_ input: Input) -> Subscribers.Demand
func receive(completion: Subscribers.Completion<Failure>)
}
这里,之所以让 SubscriptionBehavior
是一个 class
才能遵守的协议,是因为 Swift 会自动为类生成 CustomCombineIdentifierConvertible
要求的 combineIdentifier
属性,省得我们自己去实现它了,用起来方便一些。
其次,来看关联类型和属性的部分:
Input
和Failure
是上游Publisher
中的普通和错误事件类型;Output
和OutputFailure
是下游Subscriber
的普通和错误事件类型;demand
是下游Subscriber
向上传递的要订阅的事件数量;upstream
是上游Publisher
下发的Subscription
对象,由于不一定存在上游Publisher
,因此这是一个 optional;downstream
是下游订阅者的引用;
最后,是接口的部分:
request
就是 Combine 中Subscription
约束的request
接口,它收集订阅者期望订阅的事件数量;- 剩下的两个
receive
方法,让SubscriptionBehavior
具有Subscriber
的行为特征,既可以让它接收到Publisher
的普通和完成事件;
以上,就是我们推测的 Subscription
应该具备的全部行为特征了。然后,根据之前整理的事件发布订阅模型中 Subscription
的功能,我们来给这些接口添加默认实现。
首先是 request
,它累计下游订阅者的订阅需求,并把这个需求传到上游发布者:
public extension SubscriptionBehavior {
func request(_ d: Subscribers.Demand) {
demand += d
upstream?.request(demand)
}
}
其次,是 Cancellable
约束的 cancel
方法,我们这个用于演示的 SubscriptionBehavior
无需做额外的操作,因此直接通知上游发布者就好了:
public extension SubscriptionBehavior {
func cancel() {
upstream?.cancel()
}
}
第三,是把正常事件转发到订阅者的 receive
方法。它执行的逻辑就是 demand
有配额就调用订阅者的 receive
方法,并更新 demand
,这里要特别注意 demand
的计算方法。它是和下游订阅者的 receive
返回值累计的。否则,就返回 Demand.none
:
public extension SubscriptionBehavior
where Input == Output, Failure == OutputFailure {
func receive(_ input: Input) -> Subscribers.Demand {
if demand > 0 {
let newDemand = downstream.receive(input)
demand = newDemand + (demand - 1)
return demand
}
return Subscribers.Demand.none
}
}
最后,是接受完成事件的 receive
方法。由于我们不需要做额外的处理,这次,就直接把事件转发给下游订阅者就好了:
public extension SubscriptionBehavior
where Failure == OutputFailure {
func receive(completion: Subscribers.Completion<Failure>) {
downstream.receive(completion: completion)
}
}
至此,按照我们的推断,一个 Subscription
对象应有的行为,就都实现完了。这里,我们只是让 SubscriptionBehavior
看上去像一个 Subscription
而已,并没有给它一个正式的“名分”,至于为什么这样做,接着往下看就好了。
自定义 Subscription
接下来,利用 SubscriptionBehavior
,我们自定义一个实现了 Subscription
的类型。为了线程安全,我们把 SubscriptionBehavior
的所有操作,都用一个互斥锁保护起来。还是在 CustomSubscription.swift 里,先来看这个类型的定义:
public struct CustomSubscription<Content: SubscriptionBehavior>
: Subscriber, Subscription {
public typealias Input = Content.Input
public typealias Failure = Content.Failure
public var combineIdentifier: CombineIdentifier {
return content.combineIdentifier
}
let recursiveMutext = NSRecursiveLock()
let content: Content
init(behavior: Content) {
self.content = behavior
}
}
按照之前的推测,一个实现了 Subscription
的类型也应该是一个实现了 Subscriber
的类型。因此,CustomSubscription
是在我们的框架中,有正式名分的 Subscription
对象。而之所以要把 SubscriptionBehavior
放在泛型参数里,是为了让它专注于处理事件发布和订阅的具体逻辑。而 CustomSubscription
则专注于让这些方法保持线程安全性。如果你还有点迷糊,没关系,继续往下看就好了。
首先,我们来实现 CustomSubscription
中 Subscription
的部分:
public func request(_ demand: Subscribers.Demand) {
recursiveMutex.lock()
defer { recursiveMutex.unlock() }
content.request(demand)
}
public func cancel() {
recursiveMutex.lock()
content.cancel()
recursiveMutex.unlock()
}
看到了吧,这就是我们说过的区分开执行逻辑和线程安全性的方式。接下来,我们再来实现 Subscriber
的部分。
public func receive(subscription upstream: Subscription) {
recursiveMutext.lock()
defer { recursiveMutext.unlock() }
content.upstream = upstream
content.downstream.receive(subscription: self)
}
通过这个 receive(subscription:)
的实现,我们就知道 Subscription
是如何持有上级 Subscription
引用,以及如何把自身对象传到下级订阅者的了。
最后,是接受普通和完成事件的 receive
方法。它们的实现都很简单,就是在 recursiveMutex
的保护下接收事件而已:
public func receive(_ input: Input) -> Subscribers.Demand {
recursiveMutex.lock()
defer { recursiveMutex.unlock() }
return content.receive(input)
}
public func receive(completion: Subscribers.Completion<Failure>) {
recursiveMutex.lock()
defer { recursiveMutex.unlock() }
content.receive(completion: completion)
}
What's next?
至此,按照我们的推测,一个实现了 Subscription
的类型该有的样子,就完成了。在继续之前,回顾下我们对 Combine 事件发布订阅模型的描述,以及 CustomSubscription
是如何与这个模型中对应的环节关联起来的。如果你确定已经没问题了,就可以进入下一节的内容了。