在上一节末尾,我们提出了一些 Combine 中看似“有违常理”的现象。为了搞清楚这些问题,我们要从 Publisher
和 Subscriber
之间事件的传递方式说起。
之前,实现 SubscriptionBehavior
的时候,接收普通事件的 receive(_ input:)
方法是这样的:
func receive(_ input: Input) -> Subscribers.Demand {
if demand > 0 {
let newDemand = downstream.receive(input)
demand = newDemand + (demand - 1)
return demand
}
return Subscribers.Demand.none
}
也就是说,只有在 demand
大于 0 的时候,Publisher
才会把 input
递交给订阅者,否则就会丢弃 input
。通常,在自定义 Subscriber
的时候,receive(_ input:)
最后都会直接返回 .unlimited
表示一直订阅到 Publisher
结束。特别是那些订阅用户交互事件的代码,更是如此。因此,依靠 demand
决定是否递交事件的模式在绝大多数情况下,并不会产生问题。
CustomDemandSink
接下来,为了更具象地观察到 demand
对订阅事件的影响。在项目中,我们创建了一个 CustomDemandSink.swift。在这里定义一个可以动态调整 demand
的订阅者类型:CustomDemandSink
。先来看它的属性:
public struct CustomDemandSink<Input, Failure: Error>
: Subscriber, Cancellable {
public let combineIdentifier: CombineIdentifier = CombineIdentifier()
let activeSubscription =
AtomicBox<(demand: Int, subscription: Subscription?)>((0, nil))
let value: (Input) -> Void
let completion: (Subscribers.Completion<Failure>) -> Void
public init(
demand: Int,
receiveValue: @escaping ((Input) -> Void),
receiveCompletion: @escaping ((Subscribers.Completion<Failure>) -> Void)) {
activeSubscription.mutate { $0.demand = demand }
self.value = receiveValue
self.completion = receiveCompletion
}
}
其中:
combineIdentifier
是来自Subscriber
的要求。由于CustomDemandSink
是一个结构,Swift 无法自动为它合成这个属性,因此我们直接用 Combine API 定义了一个;activeSubscription
是一个用AtomicBox
保护的 tuple。demand
部分表示允许我们动态进行调整的订阅需求,subscription
表示当前订阅的Publisher
;value
和completion
是订阅到普通和完成事件后,调用的方法;
接下来,我们定义一个调整订阅数量的方法 increaseDemand(_ value:)
,它把 activeSubscription
内部的 subscription
订阅事件的最大值,调整到 value
。之所以要有这个方法,是为了稍后方便我们观察 activeSubscription
的订阅情况:
public func increaseDemand(_ value: Int) {
activeSubscription.mutate {
$0.subscription?.request(.max(value))
}
}
然后,是 Cancellable
协议的实现。我们调用 subscription
的 cancel
方法取消订阅,并重置 activeSubscription
的状态就好了:
public func cancel() {
activeSubscription.mutate {
$0.subscription?.cancel()
$0 = (0, nil)
}
}
最后,是 Subscriber
的实现,也就是它约束的三个 receive
方法:
public func receive(subscription: Subscription) {
activeSubscription.mutate {
if $0.subscription == nil {
$0.subscription = subscription
if $0.demand > 0 {
$0.demand -= 1
subscription.request(.max(1))
}
}
}
}
public func receive(_ input: Input) -> Subscribers.Demand {
value(input)
var demand = Subscribers.Demand.none
activeSubscription.mutate {
if $0.demand > 0 {
$0.demand -= 1
demand = .max(1)
}
}
return demand
}
public func receive(completion c: Subscribers.Completion<Failure>) {
completion(c)
activeSubscription.mutate {
$0 = (0, nil)
}
}
先来看 receive(subscription:)
。如果 activeSubscription.subscription
为 nil
,就把接收到的参数设置成当前的 Subscription
对象,并向这个对象表示的 Publisher
订阅 1 个事件。
再来看 receive(_ input:)
。它先调用我们指定的 value
处理事件 input
。后半部分,则和 receive(subscription:)
是基本相同的。只是当 activeSubscription.demand
为 0 的时候,它会向上游 Publisher
返回 .none
表示不再接收事件。
最后,是 receive(completion c:)
。它先调用我们指定的 completion
方法处理完成事件。然后重置 activeSubscription
的值。至此,这个可以在外部手工调整订阅需求的订阅者就完成了。
编写测试用例
接下来,我们编写一个测试用例,观察下 demand
对订阅事件的影响:
func testDemand() {
var received = [Subscribers.Event<Int, Never>]()
let subject = PassthroughSubject<Int, Never>()
let sink = CustomDemandSink<Int, Never>(
demand: 2,
receiveValue: { received.append(.value($0)) },
receiveCompletion: { received.append(.complete($0)) }
)
subject.subscribe(sink)
subject.send(sequence: 1...3, completion: nil)
sink.increaseDemand(2)
subject.send(sequence: 4...6, completion: .finished)
sink.increaseDemand(2)
XCTAssertEqual(received, [1, 2, 4, 5].asEvents(completion: .finished))
}
首先,我们创建了一个可订阅 2 个事件的 CustomDemandSink
对象。当 subject
产生 1...3
事件的时候,显然 sink
只应该订阅到 1 和 2,3 会被 subject
丢弃掉。
之后,我们把 sink
的 demand
又加了 2。于是,sink
可以订阅到接下来的 4 和 5,6 会被 subject
丢弃掉。因此,在最后的测试条件里,我们判断的事件,就是 [1, 2, 4, 5]
。
通过这个例子,我们至少已经呈现了上一节提出的一个现象:订阅了一个 subject
,并不一定可以收到它接下来的所有事件。
What's next?
看过了这个例子,你可能会觉得,尽管 demand
是一种控制订阅事件数量的机制,但理解它的前前后后要付出的努力,可能真的要比它实际的作用大的多得多。特别是,当事件的发送,和订阅不在同一个线程的时候,这个功能简直就是个 Bug 一样的存在了。下一节,我们就来看下这种情况以及对应的应对方法。