欢迎回来,基于上一节实现的 Behavior,我们完成 BufferSubject 剩余的部分,也就是 Subject 接口的实现。我们从 BufferSubject 的属性部分说起:
public class BufferSubject<Output, Failure: Error>: Subject {
/// class Behavior ...
typealias SubscriberRecords =
Dictionary<CombineIdentifier, CustomSubscription<Behavior>>
let subscribers = AtomicBox<SubscriberRecords>([:])
let buffer: AtomicBox<Buffer<Output, Failure>>
public init(limit: Int = 1,
whenFull strategy: Publishers.BufferingStrategy<Failure> = .dropOldest) {
precondition(limit >= 0)
buffer = AtomicBox(Buffer(limit: limit, strategy: strategy))
}
}
其中,subscribers 的定义,和 CustomSubject 是一样的。通过它的定义,我们可以进一步体会在 CustomSubscription 的实现里,把 Subscription 行为和线程安全性的代码隔离开的好处。另外一个属性 buffer 则是 BufferSubject 使用的缓冲区,对它的访问也需要是线程安全的。
接下来,是我们还不太清楚的 send(subscription:) 方法。老规矩,我们只是保持让 subscription 订阅事件:
public func send(subscription: Subscription) {
subscription.request(.unlimited)
}
然后,是向订阅者发送普通事件的 send(_ value:) 方法。在它的实现里,我们做了两件事情。一个是把事件缓存,以便将来可以回放给后来的订阅者;另一个,是把事件发送给当前所有的订阅者:
public func send(_ value: Output) {
buffer.mutate { b in
b.push(value)
}
for (_, sub) in subscribers.value {
_ = sub.receive(value)
}
}
接下来,是向订阅者发送完成事件的 send(completion:) 方法。它的实现逻辑,和 send(_ value:) 几乎是一样的,只是发送完事件后,要清空 subscribers:
public func send(completion: Subscribers.Completion<Failure>) {
buffer.mutate { b in
b.push(completion: completion)
}
for (_, sub) in subscribers.value {
sub.receive(completion: completion)
}
subscribers.mutate { $0.removeAll() }
}
这样,BufferSubject 中 Subject 部分的接口就完成了。最后,我们来实现 Publisher 的部分:
public func receive<S>(subscriber: S)
where S: Subscriber, Failure == S.Failure, Output == S.Input {
let behavior = Behavior(
subject: self,
downstream: AnySubscriber(subscriber),
buffer: buffer.value)
let subscription = CustomSubscription(behavior: behavior)
subscribers.mutate {
$0[subscription.combineIdentifier] = subscription
}
subscription.receive(subscription: Subscriptions.empty)
}
通过它的实现,我们就能理解当订阅 BufferSubject 的时候,究竟发生了什么。首先,创建 Behavior 的时候,可以看到所有订阅者共享的都是同一个 buffer,这也就意味着,在缓冲区未满的情况下,先来的订阅者和后来的订阅者,应该可以收到同样的消息,这也就达到了之前我们期望的给新订阅者回放所有历史事件的目的。至于 receive<S>(subscriber: S) 后半部分的实现,则和 CustomSubject 是完全一样的,我们就不重复了。
至此,BufferSubject 也就全部完成了。
用单元测试验证结果
接下来,我们把测试 multicast 的测试用例改成下面这样,来测试下 BufferSubject:
func testMulticastBuffer() {
let subjectA = PassthroughSubject<Int, Never>()
let multicastB = subjectA.scan(10, +)
.multicast { BufferSubject(limit: Int.max) }
.autoconnect()
/// `sinkC`
/// `sinkD`
XCTAssertEqual(receivedC, [11, 13, 16, 20].asEvents(completion: .finished))
XCTAssertEqual(receivedD, [11, 13, 16, 20].asEvents(completion: .finished))
sinkC.cancel()
sinkD.cancel()
}
我们只是把 multicast closure 中返回的 PassthroughSubject 替换成了 BufferSubject。订阅的过程以及事件发送的顺序,则和之前是完全一样的。现在,当 sinkD 订阅 multicastB 的时候,它就可以订阅到之前 sinkC 订阅到的所有事件了。从最后 XCTAssertEqual 判断的条件也可以看到,receivedC 和 receivedD 是相等的。
What's next?
说到这里,如果你回头看看之前所有使用了 multicast 方法的测试用例就不难发现,为了让 multicastB 生成事件,我们都使用了 autoconnect() 方法。如果换成手动连接,也可以写成这样:
func testMulticastLatest() {
let subjectA = PassthroughSubject<Int, Never>()
let multicastB = subjectA.scan(10, +)
.multicast { CurrentValueSubject(0) }
let cancelB = multicastB.connect()
/// The same as before...
cancelB.cancel()
}
当然,这里是否调用 cancel 并不影响我们的单元测试结果。但如果忽略 connect() 的返回值:
_ = multicastB.connect()
sinkC 和 sinkD 就无法订阅到任何事件了。这也和我们预期中的行为有所差别。凭着直觉,connect 之后,multicastB 就应该开始产生了事件了。究竟是什么导致了这个现象呢?为了搞清楚这个问题,下一节,我们来聊聊和 Combine 中 Subject 生命周期相关的话题。