为了研究 Combine 中的共享订阅,我们要从 Publisher
生成事件的两种模式说起。也就是在 RxSwift 中经常被讨论的冷和热。并且,RxSwift 作者在GitHub上的这篇文档中的描述,也基本上适用于 Combine。
所谓“冷”的 Publisher
,指的是在有人订阅的时候才生成事件的那一类。每个订阅都会生成一个独立的 Publisher
对象,事件发生了也就随之被处理了。这类 Publisher
和它们的订阅者的关系是静态的,因此,缓存它们的历史数据是没意义的。
而所谓“热”的 Publisher
,指的是可以在任何时候,以任何速率主动生成事件的那一类,事件的生成过程与下游订阅者无关。所有订阅者,共享同一个 Publisher
对象。这类 Publisher
在生命周期内,和它们的订阅者的关系是可以动态变化的。也正因为如此,当新订阅者加入的时候,才有所谓历史事件的处理策略问题。
共享 Subject
在之前 testDeferredSubjects
中,我们使用了 Deferred
演示了“冷” Publisher
的行为特性:即每当有订阅者的时候,就创建一个新的 PassthroughSubject
生成事件。现在,把这个测试用例改成下面这样,让所有的订阅者共享同一个 PassthroughSubject
:
func testSharedSubject() {
let subjectA = PassthroughSubject<Int, Never>()
let scanB = subjectA.scan(10, +)
var receivedC = [Subscribers.Event<Int, Never>]()
let sinkC = scanB.sink(event: { receivedC.append($0) })
subjectA.send(sequence: 1...2, completion: nil)
var receivedD = [Subscribers.Event<Int, Never>]()
let sinkD = scanB.sink(event: { receivedD.append($0) })
subjectA.send(sequence: 3...4, completion: .finished)
XCTAssertEqual(receivedC, [11, 13, 16, 20].asEvents(completion: .finished))
XCTAssertEqual(receivedD, [13, 17].asEvents(completion: .finished))
sinkC.cancel()
sinkD.cancel()
}
这次,sinkD
同样是在 subjectA
开始生成事件之后才开始订阅。但从 XCTAssertEqual
的判断条件可以看到,receivedC
中有 4 个事件,receivedD
中只有 2 个。因此,不难推断,sinkC
和 sinkD
共享了同一个 subjectA
事件序列。但如果再仔细看下 receivedD
中的内容,就会发现和我们直觉中的并不相同。sinkD
订阅到的事件为什么不是后续的 16 和 20,而是 13 和 17 呢?
问题就出在了 scan(10, +)
调用上,它返回的 Scan
是一个“冷” Publisher
。于是事情就变得拧巴了,尽管 sinkC
和 sinkD
订阅的是同一个事件源头 subjectA
,但 Scan
会分别为这两个订阅者生成不同的 Subscription
对象。因此,当 subjectA
生成事件 3 时,给 sinkD
的 Subscription
对象的初始状态仍旧是 10,因此,sinkD
订阅到的第一个事件就是 13 了。简单来说,就是它们共享了同一个 Subject
,但是没有共享为它们生成订阅事件的过程。理解了这个道理,你也就对第二个事件是 17 不觉得奇怪了吧 :)
怎么样,是不是和我们想象的特别不一样。因此,这种“冷热”的 Publisher
几乎无法直接搭配在一起工作,这是我们特别要注意的事情。为了让 sinkD
得到想象中的结果,我们可以使用 multicast
:
func testMulticastSubject() {
let subjectA = PassthroughSubject<Int, Never>()
let multicastB = subjectA.scan(10, +)
.multicast { PassthroughSubject() }
.autoconnect()
var receivedC = [Subscribers.Event<Int, Never>]()
let sinkC = multicastB.sink(event: { receivedC.append($0) })
subjectA.send(sequence: 1...2, completion: nil)
var receivedD = [Subscribers.Event<Int, Never>]()
let sinkD = multicastB.sink(event: { receivedD.append($0) })
subjectA.send(sequence: 3...4, completion: .finished)
XCTAssertEqual(receivedC, [11, 13, 16, 20].asEvents(completion: .finished))
XCTAssertEqual(receivedD, [16, 20].asEvents(completion: .finished))
sinkC.cancel()
sinkD.cancel()
}
这样,就又把 scan
计算的事件值,通过一个共享的 PassthroughSubject
发送了出来。sinkD
收到的,就是我们预期中的 16 和 20 了。这里别忘了对 multicast
的返回结果使用 autoconnect()
方法,这样才可以让 multicastB
开始生成事件。
缓存计算出来的事件
除了共享 Subject
之外,我们也可以共享事件的计算结果。当前版本的 Combine 提供了一个 CurrentValueSubject
完成这个任务。为了了解它的工作方式,我们可以把上一个测试用例中创建 multicastB
的部分改成这样:
func testMulticastLatest() {
let subjectA = PassthroughSubject<Int, Never>()
let multicastB = subjectA
.scan(10) { state, next in state + next }
.multicast { CurrentValueSubject(0) }
.autoconnect()
/// ...
XCTAssertEqual(receivedC, [11, 13, 16, 20].asEvents(completion: .finished))
XCTAssertEqual(receivedD, [13, 16, 20].asEvents(completion: .finished))
}
可以看到,这次 sinkD
接收到的事件中,就包含了订阅 multicastB
时,它当前的最新事件 13。这个值是缓存在 multicastB
里的,并不需要 scan
重新计算。
What's next?
但 CurrentValueSubject
只能缓存一个事件,如果想要缓存多个怎么办呢?你第一个想到的可能是 Publishers.Buffer
,但它只能在“热” Publisher
以及它的订阅者之间缓存事件,并不能在它的多个下游订阅者之间缓存事件。也就是说,在我们这种情况里,Buffer
并不能为 sinkD
缓存特定数量的 sinkC
订阅到事件。因此,如果真的需要这样的缓存行为,我们就只能自己实现一个了,好在有了之前实现 CustomSubject
的经验,这并不太困难。