如果让一个订阅者同时订阅多个 Publisher,它可以同时收到来自这些 Publisher 的事件么?这一节,我们就来研究这个问题。

首先,我们直接创建一个表达这个想法的测试用例:

func testMultipleSubscribe() {
  let subject1 = PassthroughSubject<Int, Never>()
  let subject2 = PassthroughSubject<Int, Never>()
  var received = [Subscribers.Event<Int, Never>]()

  let sink = Subscribers.Sink<Int, Never>(receiveCompletion: {
      received.append(.complete($0))
    }, receiveValue: {
      received.append(.value($0))
    })

  subject1.subscribe(sink)
  subject2.subscribe(sink)

  subject1.send(sequence: 1...2, completion: .finished)
  subject2.send(sequence: 3...4, completion: .finished)

  XCTAssertEqual(received, (1...2).asEvents(completion: .finished))
}

最后的判断条件,就完全说明问题了。首先,一个订阅者可以订阅多个 Publisher,上面的代码可以运行;其次,一个订阅者只能订阅到第一个 Publisher 生成的事件,而忽略其余的 Publisher。按照之前我们对 Combine 事件发布订阅模型的整理,一个 Subscriber 只能持有一个来自 PublisherSubscription 对象,并从中接收事件。

但如果我们就是要同时订阅多个 Publisher 该怎么办呢?在 Combine 里,目前唯一具有这个功能的,就是 Subject。为了演示这个特性,我们把刚才的测试用例改成这样:

func testMultipleSubjectSubscribe() {
  let subject1 = PassthroughSubject<Int, Never>()
  let subject2 = PassthroughSubject<Int, Never>()
  let multiSubject = PassthroughSubject<Int, Never>()

  let cancellable1 = subject1.subscribe(multiSubject)
  let cancellable2 = subject2.subscribe(multiSubject)

  var received = [Subscribers.Event<Int, Never>]()

  let sink = multiSubject.sink(receiveCompletion: {
    received.append(.complete($0))
  }, receiveValue: {
    received.append(.value($0))
  })

  subject1.send(sequence: 1...2, completion: nil)
  subject2.send(sequence: 3...4, completion: .finished)

  XCTAssertEqual(received, [1, 2, 3, 4].asEvents(completion: .finished))

  cancellable1.cancel()
  cancellable2.cancel()
  sink.cancel()
}

这次,我们先把 multiSubject 当成了一个订阅者,订阅了上游的 subject1subject2。然后,再把它当成了一个 Publisher,并订阅了其中的事件。从测试条件就能看到,这样,就可以订阅到多个上游 Publisher 产生的事件了。

但这个结果也是有条件的,首先,我们必须小心构建 subject1subject2,让它们产生的事件逻辑上,可以形成一个流。因此,我们让 subject1 的完成事件是 nil。好让 multiSubject 可以继续从 subject2 接受事件。如果我们让 subject1 的完成事件也是 .finsihedmultiSubject 就订阅不到 subject2 的事件了。

另外,我们需要手工维护所有用于取消上游订阅的 Cancellable 对象。因为,即便 subject1subject2 离开了作用域,这个订阅也不会默认被取消掉。所以,尽管上面的代码“可以工作”,但附加的条条框框足以让我们对这样的代码敬而远之。

MergeSink

为了解决这些不便之处,我们可以给 PassthroughSubject 写一个包装类改进上面提出的问题。一方面,保留它订阅多个上游 Publisher 的能力;另一方面,让这个包装类自我管理处于中间环节的 Cancellable。为此,我们创建了一个 MergeSink.swift,在其中定义了一个叫做 MergeInput 的类用来合并多个 Publisher 的事件,先来看 MergeSink 的属性:

public class MergeInput<I>: Publisher, Cancellable {
  public typealias Output = I
  public typealias Failure = Never

  private let subscriptions = AtomicBox(
    Dictionary<CombineIdentifier, Subscribers.Sink<I, Never>>())
  private let subject = PassthroughSubject<I, Never>()

  public init() {}
}

其中:

  • subscriptions 用来管理所有订阅上游 Publisher 的订阅者;
  • subject 是为下游订阅者服务的 Publisher

然后,来看它的 Publisher 接口部分的实现。作为一个普通的 Publisher,它的实现也就是把订阅者传递给实际提供订阅功能的 subjectreceive(subscriber:) 方法就好了。

public func receive<S>(subscriber: S)
  where S: Subscriber, S.Failure == Never, S.Input == I {
  subject.receive(subscriber: subscriber)
}

接下来,是 Cancellable 接口部分的实现。这里我们需要做的,是调用每一个订阅者的 cancel() 方法取消订阅,然后清空 subscribers 字典:

public func cancel() {
  subscriptions.mutate {
    $0.values.forEach { $0.cancel() }
    $0.removeAll()
  }
}

为了让 MergeInput 可以在释放的时候,自动取消所有订阅,我们只要在 deinit 中调用 cancel 就好了:

deinit { cancel() }

最后,是让 MergeInput 接受多个 Publisher 事件的实现:

public func subscribe<P>(_ publisher: P)
  where P: Publisher, P.Output == I, P.Failure == Never {
  var identifier: CombineIdentifier?

  let sink = Subscribers.Sink<I, P.Failure>(
    receiveCompletion: { _ in
      self.subscriptions.mutate {
        _ = $0.removeValue(forKey: identifier!)
      }
    }, receiveValue: {
      self.subject.send($0)
    })

  identifier = sink.combineIdentifier
  subscriptions.mutate { $0[sink.combineIdentifier] = sink }

  publisher.subscribe(sink)
}

它的参数 publisher 表示要订阅的上游 Publisher,这个 Publisher 的普通事件类型和 MergeInput 相同,错误事件类型也是 Never。至于实现的方法,就是在内部创建一个 publisher 的订阅者,当收到普通事件的时候,就让它通过 subject 转发给下游订阅者。收到完成事件的时候,就把它从 subscriptions 中删除。创建完成后,把它纳入到 subscriptions 统一管理,并用它订阅 publisher 就好了。

有了这个 MergeInput 之后,我们给 Publisher 添加一个扩展,方便让它接受更多 Publisher

public extension Publisher where Failure == Never {
  func merge(into mergeInput: MergeInput<Output>) {
    mergeInput.subscribe(self)
  }
}

What's next?

这样,我们就可以用这个扩展,先让 subject 合并进多个 Publisher,再让这个 subject 为下游订阅者提供服务了。下一节,我们将为 MergeInput 编写测试用例,验证这个应用场景。并继续讨论另外一种多次订阅的情况。

所有订阅均支持 12 期免息分期

¥ 59

按月订阅

一个月,观看并下载所有视频内容。初来泊学,这可能是个最好的开始。

开始订阅

¥ 512

按年订阅

一年的时间,让我们一起疯狂地狩猎知识吧。比按月订阅优惠 28%

开始订阅

¥ 1280

泊学终身会员

永久观看和下载所有泊学网站视频,并赠送 100 元商店优惠券。

我要加入
如需帮助,欢迎通过以下方式联系我们