如果让一个订阅者同时订阅多个 Publisher,它可以同时收到来自这些 Publisher 的事件么?这一节,我们就来研究这个问题。
首先,我们直接创建一个表达这个想法的测试用例:
func testMultipleSubscribe() {
let subject1 = PassthroughSubject<Int, Never>()
let subject2 = PassthroughSubject<Int, Never>()
var received = [Subscribers.Event<Int, Never>]()
let sink = Subscribers.Sink<Int, Never>(receiveCompletion: {
received.append(.complete($0))
}, receiveValue: {
received.append(.value($0))
})
subject1.subscribe(sink)
subject2.subscribe(sink)
subject1.send(sequence: 1...2, completion: .finished)
subject2.send(sequence: 3...4, completion: .finished)
XCTAssertEqual(received, (1...2).asEvents(completion: .finished))
}
最后的判断条件,就完全说明问题了。首先,一个订阅者可以订阅多个 Publisher,上面的代码可以运行;其次,一个订阅者只能订阅到第一个 Publisher 生成的事件,而忽略其余的 Publisher。按照之前我们对 Combine 事件发布订阅模型的整理,一个 Subscriber 只能持有一个来自 Publisher 的 Subscription 对象,并从中接收事件。
但如果我们就是要同时订阅多个 Publisher 该怎么办呢?在 Combine 里,目前唯一具有这个功能的,就是 Subject。为了演示这个特性,我们把刚才的测试用例改成这样:
func testMultipleSubjectSubscribe() {
let subject1 = PassthroughSubject<Int, Never>()
let subject2 = PassthroughSubject<Int, Never>()
let multiSubject = PassthroughSubject<Int, Never>()
let cancellable1 = subject1.subscribe(multiSubject)
let cancellable2 = subject2.subscribe(multiSubject)
var received = [Subscribers.Event<Int, Never>]()
let sink = multiSubject.sink(receiveCompletion: {
received.append(.complete($0))
}, receiveValue: {
received.append(.value($0))
})
subject1.send(sequence: 1...2, completion: nil)
subject2.send(sequence: 3...4, completion: .finished)
XCTAssertEqual(received, [1, 2, 3, 4].asEvents(completion: .finished))
cancellable1.cancel()
cancellable2.cancel()
sink.cancel()
}
这次,我们先把 multiSubject 当成了一个订阅者,订阅了上游的 subject1 和 subject2。然后,再把它当成了一个 Publisher,并订阅了其中的事件。从测试条件就能看到,这样,就可以订阅到多个上游 Publisher 产生的事件了。
但这个结果也是有条件的,首先,我们必须小心构建 subject1 和 subject2,让它们产生的事件逻辑上,可以形成一个流。因此,我们让 subject1 的完成事件是 nil。好让 multiSubject 可以继续从 subject2 接受事件。如果我们让 subject1 的完成事件也是 .finsihed,multiSubject 就订阅不到 subject2 的事件了。
另外,我们需要手工维护所有用于取消上游订阅的 Cancellable 对象。因为,即便 subject1 和 subject2 离开了作用域,这个订阅也不会默认被取消掉。所以,尽管上面的代码“可以工作”,但附加的条条框框足以让我们对这样的代码敬而远之。
MergeSink
为了解决这些不便之处,我们可以给 PassthroughSubject 写一个包装类改进上面提出的问题。一方面,保留它订阅多个上游 Publisher 的能力;另一方面,让这个包装类自我管理处于中间环节的 Cancellable。为此,我们创建了一个 MergeSink.swift,在其中定义了一个叫做 MergeInput 的类用来合并多个 Publisher 的事件,先来看 MergeSink 的属性:
public class MergeInput<I>: Publisher, Cancellable {
public typealias Output = I
public typealias Failure = Never
private let subscriptions = AtomicBox(
Dictionary<CombineIdentifier, Subscribers.Sink<I, Never>>())
private let subject = PassthroughSubject<I, Never>()
public init() {}
}
其中:
subscriptions用来管理所有订阅上游Publisher的订阅者;subject是为下游订阅者服务的Publisher;
然后,来看它的 Publisher 接口部分的实现。作为一个普通的 Publisher,它的实现也就是把订阅者传递给实际提供订阅功能的 subject 的 receive(subscriber:) 方法就好了。
public func receive<S>(subscriber: S)
where S: Subscriber, S.Failure == Never, S.Input == I {
subject.receive(subscriber: subscriber)
}
接下来,是 Cancellable 接口部分的实现。这里我们需要做的,是调用每一个订阅者的 cancel() 方法取消订阅,然后清空 subscribers 字典:
public func cancel() {
subscriptions.mutate {
$0.values.forEach { $0.cancel() }
$0.removeAll()
}
}
为了让 MergeInput 可以在释放的时候,自动取消所有订阅,我们只要在 deinit 中调用 cancel 就好了:
deinit { cancel() }
最后,是让 MergeInput 接受多个 Publisher 事件的实现:
public func subscribe<P>(_ publisher: P)
where P: Publisher, P.Output == I, P.Failure == Never {
var identifier: CombineIdentifier?
let sink = Subscribers.Sink<I, P.Failure>(
receiveCompletion: { _ in
self.subscriptions.mutate {
_ = $0.removeValue(forKey: identifier!)
}
}, receiveValue: {
self.subject.send($0)
})
identifier = sink.combineIdentifier
subscriptions.mutate { $0[sink.combineIdentifier] = sink }
publisher.subscribe(sink)
}
它的参数 publisher 表示要订阅的上游 Publisher,这个 Publisher 的普通事件类型和 MergeInput 相同,错误事件类型也是 Never。至于实现的方法,就是在内部创建一个 publisher 的订阅者,当收到普通事件的时候,就让它通过 subject 转发给下游订阅者。收到完成事件的时候,就把它从 subscriptions 中删除。创建完成后,把它纳入到 subscriptions 统一管理,并用它订阅 publisher 就好了。
有了这个 MergeInput 之后,我们给 Publisher 添加一个扩展,方便让它接受更多 Publisher:
public extension Publisher where Failure == Never {
func merge(into mergeInput: MergeInput<Output>) {
mergeInput.subscribe(self)
}
}
What's next?
这样,我们就可以用这个扩展,先让 subject 合并进多个 Publisher,再让这个 subject 为下游订阅者提供服务了。下一节,我们将为 MergeInput 编写测试用例,验证这个应用场景。并继续讨论另外一种多次订阅的情况。