希望你还记得,之前我们通过测试用例研究 Combine 事件发布订阅模型的时候,创建过一个叫做 Publishers.Scan
的对象对上游事件序列中的事件值进行累计。这一节,我们就通过自定义一个类似的CustomScan
,来了解这类类型的工作原理。
自定义 CustomScan
同样作为一个 Publisher
,其实实现的过程和上一节的 CustomSubject
是一样的。首先,根据它的订阅者接收事件的方式定义一个实现了 SubscriptionBehavior
的类型;其次,根据这个类型,实现 Publisher
约束的 receive<S>(subscriber: S)
就好了。
为此,在项目中,我们创建了一个 CustomScan.swift,在其中添加 CustomScan
的实现。先来看 CustomScan
的定义:
public class CustomScan<Upstream: Publisher, Output>: Publisher {
public typealias Failure = Upstream.Failure
let reducer: (Output, Upstream.Output) -> Output
let upstream: Upstream
let initialState: Output
public init(upstream: Upstream,
initialResult: Output,
nextPartialResult: @escaping (Output, Upstream.Output) -> Output) {
self.initialState = initialResult
self.reducer = nextPartialResult
self.upstream = upstream
}
}
其中,泛型参数 Upstream
表示上游事件发布者的类型,Output
表示 CustomScan
自身转换出来的新事件类型。由于 CustomScan
不转换错误事件的类型,所以我们一开始,让 Failure
就是上游事件序列中的错误类型。接下来:
reducer
是累计事件值的计算方法;upstream
是上游事件序列的引用,由于CustomScan
要依赖于上游事件进行计算,因此这个属性不是一个 optional;initialState
是计算的初始值;
在 memberwise init 方法里,逐个对这些属性初始化就好了。
自定义 SubscriptionBehavior
接下来,和上一节创建 CustomSubject
一样,我们也要给 CustomScan
自定义一个 CustomBehavior
对象。因为我们要在接收到上游事件之后,进行累加计算。还是先来看它的定义:
public class CustomScan<Upstream: Publisher, Output>: Publisher {
class Behavior: SubscriptionBehavior {
typealias Input = Upstream.Output
var upstream: Subscription? = nil
let downstream: AnySubscriber<Output, Upstream.Failure>
var demand: Subscribers.Demand = .none
let reducer: (Output, Upstream.Output) -> Output
var state: Output
init(downstream: AnySubscriber<Output, Upstream.Failure>,
reducer: @escaping (Output, Upstream.Output) -> Output,
initialState: Output) {
self.downstream = downstream
self.reducer = reducer
self.state = initialState
}
func receive(_ input: Upstream.Output) -> Subscribers.Demand {
state = reducer(state, input)
return downstream.receive(state)
}
}
}
可以看到,除了“沿袭”自 SubscriptionBehavior
的内容之外,Behavior
还持有了 CustomScan
的计算方法和初始状态,通过这个定义,我们可以进一步领会在 Publisher
背后,Subscription
对象的功能。在我们的实现里,上游事件值的累计,是在递交给订阅者的时候,计算出来的。
实现 Publisher 约束的方法
有了 Behavior
之后,就可以实现 CustomScan
的剩余方法了。也就是 Publisher
约束的 receive<S>(subscriber:)
:
public func receive<S>(subscriber: S)
where S: Subscriber, Failure == S.Failure, Output == S.Input {
let downstream = AnySubscriber(subscriber)
let behavior = Behavior(
downstream: downstream, reducer: reducer, initialState: initialState)
let subscription = CustomSubscription(behavior: behavior)
upstream.subscribe(subscription)
}
这里,和 CustomSubject
唯一不同的就是,我们把包含了 CustomScan
事件转换逻辑的 Subscription
对象作为一个订阅者,订阅的上游事件发布者。每一个中间环节都像这样订阅订阅上游发布者,整个事件链就通过 Subscription
连接起来了。
通过单元测试验证模型
说到这里,用于验证 Combine 事件发布订阅模型需要的核心组件,就都完成了。
为了验证这个模型,我们基于之前写过的 testDeferredSubjects
,创建一个新的测试用例:
func testCustomSubjects() {
var subjects = [CustomSubject<Int, Never>]()
let deferred = Deferred { () -> CustomSubject<Int, Never> in
let request = CustomSubject<Int, Never>()
subjects.append(request)
return request
}
let scanB = CustomScan(
upstream: deferred,
initialResult: 10,
nextPartialResult: +)
var receivedC = [Subscribers.Event<Int, Never>]()
let sinkC = Subscribers.Sink<Int, Never>(
receiveCompletion: {
receivedC.append(.complete($0))
},
receiveValue: {
receivedC.append(.value($0))
})
var receivedD = [Subscribers.Event<Int, Never>]()
let sinkD = Subscribers.Sink<Int, Never>(
receiveCompletion: {
receivedD.append(.complete($0))
},
receiveValue: {
receivedD.append(.value($0))
})
scanB.subscribe(sinkC)
subjects[0].send(sequence: 1...2, completion: nil)
scanB.subscribe(sinkD) /// `sinkD` does not receive events in `subjects[0]`
subjects[0].send(sequence: 3...4, completion: .finished)
subjects[1].send(sequence: 1...4, completion: .finished)
XCTAssertEqual(receivedC, [11, 13, 16, 20].asEvents(completion: .finished))
XCTAssertEqual(receivedD, [11, 13, 16, 20].asEvents(completion: .finished))
}
这次,我们让 deferred
包含了两个 CustomSubject
对象。其余的部分,和 testDeferredSubjects
是完全相同的,如果一切顺利,测试的结果也应该和使用 PassthroughSubject
是一样的。不过,在开始自行测试之前,先来想个问题:为什么这个测试用例可以验证我们对模型的推测呢?
首先,CustomSubject
作为一个 Publisher
,我们定义了它的 receive(subscriber:)
方法。但 sinkC
和 sinkD
订阅它的时候,subscribe
方法仍旧是 Combine 提供的,如果订阅之后 receive(subscriber:)
被调用了,就验证了这个方法被调用的时机;
其次,在 receive(subscriber:)
的实现里,receive(subscription: Subscriptions.empty)
应该会导致封装着原始订阅者的 Subscription
中的 request
方法被调用,也就是我们在 CustomSubscription
中定义的 request(_ demand:)
方法;
最后,CustomSubscription
是一个值类型,这也就意味着在 receive<S>(subscriber:)
方法里,每个订阅者获得的,都是一份属于它自己的 CustomSubscription
对象,因此,sinkC
和 sinkD
订阅到的结果,也应该是独立的;
把这些问题想清楚之后,验证是非常简单的,执行一下这个测试用例就好了,如果一切顺利,它应该是可以 Pass 的:

What's next?
以上,就是对 Combine 基础工作原理方面的研究,现在,对于 Combine 中的事件是如何发送的,如何订阅的,以及 Publisher
,Subscriber
和 Subscription
这三个角色在整个模型中的角色,我们应该了解的比较清楚了。
接下来,要探索的,是关于“共享”的话题。就像之前在测试用例分析中说到的,每个订阅者独占的 Subscription
对象会导致事件在传递到不同订阅者的时候发生重复计算。但有些时候,我们并不希望如此。如何避免这种问题呢?如何实现类似“一发多收”的效果呢?如何能缓存要下发的事件呢?在 RxSwift 里,我们可以用 ConnectableObservable<E>
。那么,在 Combine 里,如何实现类似的“共享”功能呢?