欢迎回来,基于上一节实现的 Behavior
,我们完成 BufferSubject
剩余的部分,也就是 Subject
接口的实现。我们从 BufferSubject
的属性部分说起:
public class BufferSubject<Output, Failure: Error>: Subject {
/// class Behavior ...
typealias SubscriberRecords =
Dictionary<CombineIdentifier, CustomSubscription<Behavior>>
let subscribers = AtomicBox<SubscriberRecords>([:])
let buffer: AtomicBox<Buffer<Output, Failure>>
public init(limit: Int = 1,
whenFull strategy: Publishers.BufferingStrategy<Failure> = .dropOldest) {
precondition(limit >= 0)
buffer = AtomicBox(Buffer(limit: limit, strategy: strategy))
}
}
其中,subscribers
的定义,和 CustomSubject
是一样的。通过它的定义,我们可以进一步体会在 CustomSubscription
的实现里,把 Subscription
行为和线程安全性的代码隔离开的好处。另外一个属性 buffer
则是 BufferSubject
使用的缓冲区,对它的访问也需要是线程安全的。
接下来,是我们还不太清楚的 send(subscription:)
方法。老规矩,我们只是保持让 subscription
订阅事件:
public func send(subscription: Subscription) {
subscription.request(.unlimited)
}
然后,是向订阅者发送普通事件的 send(_ value:)
方法。在它的实现里,我们做了两件事情。一个是把事件缓存,以便将来可以回放给后来的订阅者;另一个,是把事件发送给当前所有的订阅者:
public func send(_ value: Output) {
buffer.mutate { b in
b.push(value)
}
for (_, sub) in subscribers.value {
_ = sub.receive(value)
}
}
接下来,是向订阅者发送完成事件的 send(completion:)
方法。它的实现逻辑,和 send(_ value:)
几乎是一样的,只是发送完事件后,要清空 subscribers
:
public func send(completion: Subscribers.Completion<Failure>) {
buffer.mutate { b in
b.push(completion: completion)
}
for (_, sub) in subscribers.value {
sub.receive(completion: completion)
}
subscribers.mutate { $0.removeAll() }
}
这样,BufferSubject
中 Subject
部分的接口就完成了。最后,我们来实现 Publisher
的部分:
public func receive<S>(subscriber: S)
where S: Subscriber, Failure == S.Failure, Output == S.Input {
let behavior = Behavior(
subject: self,
downstream: AnySubscriber(subscriber),
buffer: buffer.value)
let subscription = CustomSubscription(behavior: behavior)
subscribers.mutate {
$0[subscription.combineIdentifier] = subscription
}
subscription.receive(subscription: Subscriptions.empty)
}
通过它的实现,我们就能理解当订阅 BufferSubject
的时候,究竟发生了什么。首先,创建 Behavior
的时候,可以看到所有订阅者共享的都是同一个 buffer
,这也就意味着,在缓冲区未满的情况下,先来的订阅者和后来的订阅者,应该可以收到同样的消息,这也就达到了之前我们期望的给新订阅者回放所有历史事件的目的。至于 receive<S>(subscriber: S)
后半部分的实现,则和 CustomSubject
是完全一样的,我们就不重复了。
至此,BufferSubject
也就全部完成了。
用单元测试验证结果
接下来,我们把测试 multicast
的测试用例改成下面这样,来测试下 BufferSubject
:
func testMulticastBuffer() {
let subjectA = PassthroughSubject<Int, Never>()
let multicastB = subjectA.scan(10, +)
.multicast { BufferSubject(limit: Int.max) }
.autoconnect()
/// `sinkC`
/// `sinkD`
XCTAssertEqual(receivedC, [11, 13, 16, 20].asEvents(completion: .finished))
XCTAssertEqual(receivedD, [11, 13, 16, 20].asEvents(completion: .finished))
sinkC.cancel()
sinkD.cancel()
}
我们只是把 multicast
closure 中返回的 PassthroughSubject
替换成了 BufferSubject
。订阅的过程以及事件发送的顺序,则和之前是完全一样的。现在,当 sinkD
订阅 multicastB
的时候,它就可以订阅到之前 sinkC
订阅到的所有事件了。从最后 XCTAssertEqual
判断的条件也可以看到,receivedC
和 receivedD
是相等的。
What's next?
说到这里,如果你回头看看之前所有使用了 multicast
方法的测试用例就不难发现,为了让 multicastB
生成事件,我们都使用了 autoconnect()
方法。如果换成手动连接,也可以写成这样:
func testMulticastLatest() {
let subjectA = PassthroughSubject<Int, Never>()
let multicastB = subjectA.scan(10, +)
.multicast { CurrentValueSubject(0) }
let cancelB = multicastB.connect()
/// The same as before...
cancelB.cancel()
}
当然,这里是否调用 cancel
并不影响我们的单元测试结果。但如果忽略 connect()
的返回值:
_ = multicastB.connect()
sinkC
和 sinkD
就无法订阅到任何事件了。这也和我们预期中的行为有所差别。凭着直觉,connect
之后,multicastB
就应该开始产生了事件了。究竟是什么导致了这个现象呢?为了搞清楚这个问题,下一节,我们来聊聊和 Combine 中 Subject
生命周期相关的话题。