如果让一个订阅者同时订阅多个 Publisher
,它可以同时收到来自这些 Publisher
的事件么?这一节,我们就来研究这个问题。
首先,我们直接创建一个表达这个想法的测试用例:
func testMultipleSubscribe() {
let subject1 = PassthroughSubject<Int, Never>()
let subject2 = PassthroughSubject<Int, Never>()
var received = [Subscribers.Event<Int, Never>]()
let sink = Subscribers.Sink<Int, Never>(receiveCompletion: {
received.append(.complete($0))
}, receiveValue: {
received.append(.value($0))
})
subject1.subscribe(sink)
subject2.subscribe(sink)
subject1.send(sequence: 1...2, completion: .finished)
subject2.send(sequence: 3...4, completion: .finished)
XCTAssertEqual(received, (1...2).asEvents(completion: .finished))
}
最后的判断条件,就完全说明问题了。首先,一个订阅者可以订阅多个 Publisher
,上面的代码可以运行;其次,一个订阅者只能订阅到第一个 Publisher
生成的事件,而忽略其余的 Publisher
。按照之前我们对 Combine 事件发布订阅模型的整理,一个 Subscriber
只能持有一个来自 Publisher
的 Subscription
对象,并从中接收事件。
但如果我们就是要同时订阅多个 Publisher
该怎么办呢?在 Combine 里,目前唯一具有这个功能的,就是 Subject
。为了演示这个特性,我们把刚才的测试用例改成这样:
func testMultipleSubjectSubscribe() {
let subject1 = PassthroughSubject<Int, Never>()
let subject2 = PassthroughSubject<Int, Never>()
let multiSubject = PassthroughSubject<Int, Never>()
let cancellable1 = subject1.subscribe(multiSubject)
let cancellable2 = subject2.subscribe(multiSubject)
var received = [Subscribers.Event<Int, Never>]()
let sink = multiSubject.sink(receiveCompletion: {
received.append(.complete($0))
}, receiveValue: {
received.append(.value($0))
})
subject1.send(sequence: 1...2, completion: nil)
subject2.send(sequence: 3...4, completion: .finished)
XCTAssertEqual(received, [1, 2, 3, 4].asEvents(completion: .finished))
cancellable1.cancel()
cancellable2.cancel()
sink.cancel()
}
这次,我们先把 multiSubject
当成了一个订阅者,订阅了上游的 subject1
和 subject2
。然后,再把它当成了一个 Publisher
,并订阅了其中的事件。从测试条件就能看到,这样,就可以订阅到多个上游 Publisher
产生的事件了。
但这个结果也是有条件的,首先,我们必须小心构建 subject1
和 subject2
,让它们产生的事件逻辑上,可以形成一个流。因此,我们让 subject1
的完成事件是 nil
。好让 multiSubject
可以继续从 subject2
接受事件。如果我们让 subject1
的完成事件也是 .finsihed
,multiSubject
就订阅不到 subject2
的事件了。
另外,我们需要手工维护所有用于取消上游订阅的 Cancellable
对象。因为,即便 subject1
和 subject2
离开了作用域,这个订阅也不会默认被取消掉。所以,尽管上面的代码“可以工作”,但附加的条条框框足以让我们对这样的代码敬而远之。
MergeSink
为了解决这些不便之处,我们可以给 PassthroughSubject
写一个包装类改进上面提出的问题。一方面,保留它订阅多个上游 Publisher
的能力;另一方面,让这个包装类自我管理处于中间环节的 Cancellable
。为此,我们创建了一个 MergeSink.swift,在其中定义了一个叫做 MergeInput
的类用来合并多个 Publisher
的事件,先来看 MergeSink
的属性:
public class MergeInput<I>: Publisher, Cancellable {
public typealias Output = I
public typealias Failure = Never
private let subscriptions = AtomicBox(
Dictionary<CombineIdentifier, Subscribers.Sink<I, Never>>())
private let subject = PassthroughSubject<I, Never>()
public init() {}
}
其中:
subscriptions
用来管理所有订阅上游Publisher
的订阅者;subject
是为下游订阅者服务的Publisher
;
然后,来看它的 Publisher
接口部分的实现。作为一个普通的 Publisher
,它的实现也就是把订阅者传递给实际提供订阅功能的 subject
的 receive(subscriber:)
方法就好了。
public func receive<S>(subscriber: S)
where S: Subscriber, S.Failure == Never, S.Input == I {
subject.receive(subscriber: subscriber)
}
接下来,是 Cancellable
接口部分的实现。这里我们需要做的,是调用每一个订阅者的 cancel()
方法取消订阅,然后清空 subscribers
字典:
public func cancel() {
subscriptions.mutate {
$0.values.forEach { $0.cancel() }
$0.removeAll()
}
}
为了让 MergeInput
可以在释放的时候,自动取消所有订阅,我们只要在 deinit
中调用 cancel
就好了:
deinit { cancel() }
最后,是让 MergeInput
接受多个 Publisher
事件的实现:
public func subscribe<P>(_ publisher: P)
where P: Publisher, P.Output == I, P.Failure == Never {
var identifier: CombineIdentifier?
let sink = Subscribers.Sink<I, P.Failure>(
receiveCompletion: { _ in
self.subscriptions.mutate {
_ = $0.removeValue(forKey: identifier!)
}
}, receiveValue: {
self.subject.send($0)
})
identifier = sink.combineIdentifier
subscriptions.mutate { $0[sink.combineIdentifier] = sink }
publisher.subscribe(sink)
}
它的参数 publisher
表示要订阅的上游 Publisher
,这个 Publisher
的普通事件类型和 MergeInput
相同,错误事件类型也是 Never
。至于实现的方法,就是在内部创建一个 publisher
的订阅者,当收到普通事件的时候,就让它通过 subject
转发给下游订阅者。收到完成事件的时候,就把它从 subscriptions
中删除。创建完成后,把它纳入到 subscriptions
统一管理,并用它订阅 publisher
就好了。
有了这个 MergeInput
之后,我们给 Publisher
添加一个扩展,方便让它接受更多 Publisher
:
public extension Publisher where Failure == Never {
func merge(into mergeInput: MergeInput<Output>) {
mergeInput.subscribe(self)
}
}
What's next?
这样,我们就可以用这个扩展,先让 subject
合并进多个 Publisher
,再让这个 subject
为下游订阅者提供服务了。下一节,我们将为 MergeInput
编写测试用例,验证这个应用场景。并继续讨论另外一种多次订阅的情况。