为了研究 Combine 中的共享订阅,我们要从 Publisher 生成事件的两种模式说起。也就是在 RxSwift 中经常被讨论的冷和热。并且,RxSwift 作者在GitHub上的这篇文档中的描述,也基本上适用于 Combine。

所谓“冷”的 Publisher,指的是在有人订阅的时候才生成事件的那一类。每个订阅都会生成一个独立的 Publisher 对象,事件发生了也就随之被处理了。这类 Publisher 和它们的订阅者的关系是静态的,因此,缓存它们的历史数据是没意义的。

而所谓“热”的 Publisher,指的是可以在任何时候,以任何速率主动生成事件的那一类,事件的生成过程与下游订阅者无关。所有订阅者,共享同一个 Publisher 对象。这类 Publisher 在生命周期内,和它们的订阅者的关系是可以动态变化的。也正因为如此,当新订阅者加入的时候,才有所谓历史事件的处理策略问题。

共享 Subject

在之前 testDeferredSubjects 中,我们使用了 Deferred 演示了“冷” Publisher 的行为特性:即每当有订阅者的时候,就创建一个新的 PassthroughSubject 生成事件。现在,把这个测试用例改成下面这样,让所有的订阅者共享同一个 PassthroughSubject

func testSharedSubject() {
  let subjectA = PassthroughSubject<Int, Never>()
  let scanB = subjectA.scan(10, +)

  var receivedC = [Subscribers.Event<Int, Never>]()
  let sinkC = scanB.sink(event: { receivedC.append($0) })
  subjectA.send(sequence: 1...2, completion: nil)

  var receivedD = [Subscribers.Event<Int, Never>]()
  let sinkD = scanB.sink(event: { receivedD.append($0) })
  subjectA.send(sequence: 3...4, completion: .finished)

  XCTAssertEqual(receivedC, [11, 13, 16, 20].asEvents(completion: .finished))
  XCTAssertEqual(receivedD, [13, 17].asEvents(completion: .finished))

  sinkC.cancel()
  sinkD.cancel()
}

这次,sinkD 同样是在 subjectA 开始生成事件之后才开始订阅。但从 XCTAssertEqual 的判断条件可以看到,receivedC 中有 4 个事件,receivedD 中只有 2 个。因此,不难推断,sinkCsinkD 共享了同一个 subjectA 事件序列。但如果再仔细看下 receivedD 中的内容,就会发现和我们直觉中的并不相同。sinkD 订阅到的事件为什么不是后续的 16 和 20,而是 13 和 17 呢?

问题就出在了 scan(10, +) 调用上,它返回的 Scan 是一个“冷” Publisher。于是事情就变得拧巴了,尽管 sinkCsinkD 订阅的是同一个事件源头 subjectA,但 Scan 会分别为这两个订阅者生成不同的 Subscription 对象。因此,当 subjectA 生成事件 3 时,给 sinkDSubscription 对象的初始状态仍旧是 10,因此,sinkD 订阅到的第一个事件就是 13 了。简单来说,就是它们共享了同一个 Subject,但是没有共享为它们生成订阅事件的过程。理解了这个道理,你也就对第二个事件是 17 不觉得奇怪了吧 :)

怎么样,是不是和我们想象的特别不一样。因此,这种“冷热”的 Publisher 几乎无法直接搭配在一起工作,这是我们特别要注意的事情。为了让 sinkD 得到想象中的结果,我们可以使用 multicast

func testMulticastSubject() {
  let subjectA = PassthroughSubject<Int, Never>()
  let multicastB = subjectA.scan(10, +)
    .multicast { PassthroughSubject() }
    .autoconnect()

  var receivedC = [Subscribers.Event<Int, Never>]()
  let sinkC = multicastB.sink(event: { receivedC.append($0) })
  subjectA.send(sequence: 1...2, completion: nil)

  var receivedD = [Subscribers.Event<Int, Never>]()
  let sinkD = multicastB.sink(event: { receivedD.append($0) })
  subjectA.send(sequence: 3...4, completion: .finished)

  XCTAssertEqual(receivedC, [11, 13, 16, 20].asEvents(completion: .finished))
  XCTAssertEqual(receivedD, [16, 20].asEvents(completion: .finished))

  sinkC.cancel()
  sinkD.cancel()
}

这样,就又把 scan 计算的事件值,通过一个共享的 PassthroughSubject 发送了出来。sinkD 收到的,就是我们预期中的 16 和 20 了。这里别忘了对 multicast 的返回结果使用 autoconnect() 方法,这样才可以让 multicastB 开始生成事件。

缓存计算出来的事件

除了共享 Subject 之外,我们也可以共享事件的计算结果。当前版本的 Combine 提供了一个 CurrentValueSubject 完成这个任务。为了了解它的工作方式,我们可以把上一个测试用例中创建 multicastB 的部分改成这样:

func testMulticastLatest() {
  let subjectA = PassthroughSubject<Int, Never>()
  let multicastB = subjectA
    .scan(10) { state, next in state + next }
    .multicast { CurrentValueSubject(0) }
    .autoconnect()

  /// ...

  XCTAssertEqual(receivedC, [11, 13, 16, 20].asEvents(completion: .finished))
  XCTAssertEqual(receivedD, [13, 16, 20].asEvents(completion: .finished))
}

可以看到,这次 sinkD 接收到的事件中,就包含了订阅 multicastB 时,它当前的最新事件 13。这个值是缓存在 multicastB 里的,并不需要 scan 重新计算。

What's next?

CurrentValueSubject 只能缓存一个事件,如果想要缓存多个怎么办呢?你第一个想到的可能是 Publishers.Buffer,但它只能在“热” Publisher 以及它的订阅者之间缓存事件,并不能在它的多个下游订阅者之间缓存事件。也就是说,在我们这种情况里,Buffer 并不能为 sinkD 缓存特定数量的 sinkC 订阅到事件。因此,如果真的需要这样的缓存行为,我们就只能自己实现一个了,好在有了之前实现 CustomSubject 的经验,这并不太困难。

所有订阅均支持 12 期免息分期

¥ 59

按月订阅

一个月,观看并下载所有视频内容。初来泊学,这可能是个最好的开始。

开始订阅

¥ 512

按年订阅

一年的时间,让我们一起疯狂地狩猎知识吧。比按月订阅优惠 28%

开始订阅

¥ 1280

泊学终身会员

永久观看和下载所有泊学网站视频,并赠送 100 元商店优惠券。

我要加入
如需帮助,欢迎通过以下方式联系我们