在开始接下来的泊学 App 开发之前,我决定先插一段关于 Combine 的研究。对于我们绝大多数人而言,可能都是在遵循着 RxSwift 的经验在理解和使用 Combine。但通过修改泊学 App 的一个 bug,让我发现了当前版本的 Combine 和 RxSwift 一些明显的不同。由于接下来我们会频繁和 Combine 打交道,因此还是非常有必要深入了解下这个 Apple 新发布的响应式编程框架。
Combine 中的发布者和订阅者
而我们的探索过程,就从理解事件发布和订阅,这个最基本的模型开始。在 Combine 里,表达事件发布概念的是一个叫做 Publisher
的协议,经验上,这应该就是一个可以用不同方式不断产生值的对象。但事实上,并不如此。因为它并没有约束任何和“发送事件”这个概念有关的接口。它唯一约束的,就是一个接受“订阅者”的方法:
protocol Publisher {
associatedtype Output
associatedtype Failure : Error
func receive<S>(subscriber: S) where
S : Subscriber,
Self.Failure == S.Failure, Self.Output == S.Input
}
这是 Combine 中第一个和我们响应式编程过往经验不同的地方,Publisher
仅仅是某种可以接受订阅者的对象。那么,在 Publisher
的定义里,那个表达订阅者的 Subscriber
又是什么呢?它约束了一组接收各种不同类型事件的接口:
protocol Subscriber : CustomCombineIdentifierConvertible {
associatedtype Input
associatedtype Failure : Error
func receive(_ input: Self.Input) -> Subscribers.Demand
func receive(completion: Subscribers.Completion<Self.Failure>)
func receive(subscription: Subscription)
}
从功能上说,receive(_ input:)
负责从 Publisher
接收事件值。receive(completion:)
负责从 Publisher
接收完成事件。至于 receive(subscription:)
我们一会儿再说。
看到这,我们不难猜想,既然 Publisher
只有接受订阅者的接口,而 Subscriber
又约束了接收事件的接口,Combine 中的事件流最有可能的实现方式,就是发布者直接以“点对点”的形式把事件发送到订阅者。而这个过程,应该和最后的 receive(subscription:)
方法有着密切的联系。
观察 Combine 的事件订阅模型
了解了这两个类型之后,我们再从订阅 Publisher
的结果进一步观察下 Combine 中发布和订阅模型。
准备工作
首先,做一些准备工作。稍后,我们会自己实现一些概念版的 Combine 基础组件。为此,我们可以创建一个 Mac OS framework 项目,叫做 CustomCombine,并让它包含单元测试。在这个项目里,添加一个 Subject+Send.swift,在这里给 Subject
添加一个扩展,它可以把一个序列的元素,变成 Subject
发送的一系列事件。这个方法主要用于方便我们在单元测试中生成测试事件:
public extension Subject {
func send<S: Sequence>(
sequence: S,
completion: Subscribers.Completion<Self.Failure>? = nil)
where S.Element == Self.Output {
for v in sequence {
send(v)
}
if completion != nil {
send(completion: completion!)
}
}
}
接下来,再创建一个 Subscribers+Event.swift。在这里,添加一个 enum Event
,它是 Combine 中“事件”这个概念的封装,把订阅者可能接收的事件,统一成了一个类型。之所以要这样做,也是为了稍后在测试用例中,方便观察和比较订阅者接收到的事件。
public extension Subscribers {
enum Event<Value, Failure: Error> {
case value(Value)
case complete(Subscribers.Completion<Failure>)
}
}
extension Subscribers.Event: Equatable
where Value: Equatable, Failure: Equatable {}
然后,在这个文件里,给 Sequence
添加一个扩展方法 asEvents
,让它可以把 Sequence
中的元素类型,转换成 Event
数组,我们用这个方法,生成单元测试的期望结果:
public extension Sequence {
func asEvents<Failure>(
failure: Failure.Type,
completion: Subscribers.Completion<Failure>? = nil)
-> [Subscribers.Event<Element, Failure>] {
let values = map(Subscribers.Event<Element, Failure>.value)
guard let completion = completion else { return values }
return values + [Subscribers.Event<Element, Failure>.complete(completion)]
}
}
这里,一个很有意思的用法就是,我们可以把 enum
中带有关联值的 case
直接作为参数传递给 map
,编译器会把这个 case
认为是 (Value) -> EnumType
这种类型的函数,在我们的例子中,也就是 (Value) -> Event<Value, Failure>
。有了这个方法之后,再重载一个错误事件是 .Never
的版本。因为在单元测试中,我们主要观察的是整个事件流,而错误的类型通常并不重要。
func asEvents(completion: Subscribers.Completion<Never>? = nil)
-> [Subscribers.Event<Element, Never>] {
return asEvents(failure: Never.self, completion: completion)
}
有了这些方法之后,为这一节内容做的准备工作就完成了。
观察一个最基础的发布订阅
接下来,我们从观察一个最基本的发布订阅模型开始:

为了表达这个关系,在 CustomCombineTest.swift 里,创建一个测试用例:
func testSubjectSink() {
let subject = PassthroughSubject<Int, Never>()
var received = [Subscribers.Event<Int, Never>]()
let sink = Subscribers.Sink<Int, Never>(
receiveCompletion: {
received.append(.complete($0))
},
receiveValue: {
received.append(.value($0))
})
subject.subscribe(sink)
subject.send(sequence: 1...3, completion: .finished)
XCTAssertEqual(received, (1...3).asEvents(completion: .finished))
}
这里,事件发布者是 subject
,订阅者是 sink
。在 subject.subscribe(sink)
之后,我们通过 subject
发布了 1,2,3,.finished
事件(这里用的就是之前给 Sequence
添加的扩展方法)。显然,sink
也应该全数收到这些事件,而这,就是我们在 XCTAssertEqual
中判断的依据。
并且,即便我们像下面这样修改上游事件流,整个事件的发布订阅模型也不会受到影响:

这种关系,在 Combine 里可以用一个 transformer 表示:
func testScan() {
let subjectA = PassthroughSubject<Int, Never>()
let scanB = Publishers.Scan(
upstream: subjectA,
initialResult: 10, nextPartialResult: +)
var received = [Subscribers.Event<Int, Never>]()
let sink = Subscribers.Sink<Int, Never>(
receiveCompletion: {
received.append(.complete($0))
},
receiveValue: {
received.append(.value($0))
})
scanB.subscribe(sink)
subjectA.send(sequence: 1...3, completion: .finished)
XCTAssertEqual(received, [11, 13, 16].asEvents(completion: .finished))
}
这里,我们用 Scan
加工了原始数据流,并订阅了加工后的结果。当 subjectA
生成了原始事件之后,我们还是可以全数接受到所有加工后的事件。
至此,“订阅了一个 Publisher
之后,就可以订阅到它的所有事件”这个结论,仍旧是成立的。
半路杀出来个订阅者又如何呢
接下来,我们把订阅模型变成下面这样,在 Pub1 的生命周期里,又多出来一个订阅者会如何呢?

对于这种情况,Sub2 可以订阅到的内容,响应式框架通常会给我们一些选择,例如:
- 订阅到所有缓存的历史事件;
- 订阅到最近的一次事件;
- 只订阅下一次生成的事件;
- 重新触发整个事件订阅流程;
在 Combine 里,默认使用的就是最后一种方式,看似也最不容易理解。我们用下面这个测试用例表达这个模型:
func testSequenceABCD() {
let subjectA = PassthroughSubject<Int, Never>()
let scanB = Publishers.Scan(
upstream: subjectA, initialResult: 10, nextPartialResult: +)
var receivedC = [Subscribers.Event<Int, Never>]()
let sinkC = Subscribers.Sink<Int, Never>(
receiveCompletion: {
receivedC.append(.complete($0))
},
receiveValue: {
receivedC.append(.value($0))
})
var receivedD = [Subscribers.Event<Int, Never>]()
let sinkD = Subscribers.Sink<Int, Never>(
receiveCompletion: {
receivedD.append(.complete($0))
},
receiveValue: {
receivedD.append(.value($0))
})
scanB.subscribe(sinkC)
scanB.subscribe(sinkD)
subjectA.send(sequence: 1...3, completion: .finished)
XCTAssertEqual(receivedC, [11, 13, 16].asEvents(completion: .finished))
XCTAssertEqual(receivedD, [11, 13, 16].asEvents(completion: .finished))
}
这次,我们给 scanB
添加了两个订阅者 sinkC
和 sinkD
。所谓“重新触发整个事件订阅流程”就是指,当 sinkD
订阅 scanB
时,会驱使 scanB
重新对 subjectA
中的所有元素进行一次计算,并把结果发送到 sinkD
,而不是 sinkC
消费了 scanB
的所有事件之后,sinkD
就订阅不到事件了。
因此,“订阅了一个 Publisher
之后,就可以订阅到它的所有事件”这个结论,仍旧是成立的。
考虑上时间维度之后呢
当然你可能会觉得,上面的例子不太有说服力,毕竟,sinkC
和 sinkD
是处在同一个时间点订阅的。因此,我们很难观察到所谓“半路杀出来的订阅者”真实的订阅行为。
emm...没错,我完全认同。而当我们把这两个订阅的时间错开,得到的结论,也让我们可以发现一些有意思的结论。为了演示这个过程,我们创建一个 Deferred
对象,并用 Scan
加工其中的事件:
func testDeferredSubjects() {
var subjects = [PassthroughSubject<Int, Never>]()
let deferred = Deferred { () -> PassthroughSubject<Int, Never> in
let request = PassthroughSubject<Int, Never>()
subjects.append(request)
return request
}
let scanB = Publishers.Scan(
upstream: deferred, initialResult: 10, nextPartialResult: +)
}
这里,我们用 subjects
保存了 Deferred
中生成的 PassthroughSubject
对象的引用。这是为了稍后方便在其中生成事件。接下来,还是创建两个订阅者 sinkC
和 sinkD
,这部分和上个测试是一样的:
func testDeferredSubjects() {
/// ...
var receivedC = [Subscribers.Event<Int, Never>]()
let sinkC = Subscribers.Sink<Int, Never>(
receiveCompletion: {
receivedC.append(.complete($0))
},
receiveValue: {
receivedC.append(.value($0))
})
var receivedD = [Subscribers.Event<Int, Never>]()
let sinkD = Subscribers.Sink<Int, Never>(
receiveCompletion: {
receivedD.append(.complete($0))
},
receiveValue: {
receivedD.append(.value($0))
})
}
最后,我们先让 sinkC
订阅 scanB
,并让 scanB
开始生成事件。然后,再让 sinkD
也加入订阅,并通过 subject[0]
和 subject[1]
继续让 scanB
生成事件:
scanB.subscribe(sinkC)
subjects[0].send(sequence: 1...2, completion: nil)
scanB.subscribe(sinkD)
subjects[0].send(sequence: 3...4, completion: .finished)
subjects[1].send(sequence: 1...4, completion: .finished)
现在,sinkD
订阅了 scanB
之后,它会订阅到通过 subjects[0]
生成的后续事件么?说到这,事情就有点儿矛盾了。按照我们之前说过的“订阅了一个 Publisher
之后,就可以订阅到它的所有事件”这个结论,显然 sinkD
应该可以订阅到 subjects[0]
生成的后续 16,20,.finsihed
事件。但如果这样,它还会订阅到通过 subjects[1]
生成的 11,13,16,20,.finished
事件,一个订阅者可以订阅到两次 .finished
事件显然是不正确的。
而正确的结果是,sinkD
只能订阅到 subject[1]
通过 scanB
生成的事件:
XCTAssertEqual(receivedC, [11, 13, 16, 20].asEvents(completion: .finished))
XCTAssertEqual(receivedD, [11, 13, 16, 20].asEvents(completion: .finished))
What's next?
当然,我们的确是通过一些小伎俩刻意放大了我们想达到的效果,即:订阅同一个 Publisher
不一定可以订阅到它后续的所有事件。而我们之前用图片表达的发布者和订阅者的关系,也不一定真实的反应了事件的传递路径,在它们背后,还有着我们没有发觉的内容。而继续探寻的线索,就是 Subscriber
协议中的 func receive(subscription: Subscription)
方法。除了 Publisher
和 Subscriber
之外,这个 Subscription
是做什么的呢?