在上一节末尾,我们提出了一些 Combine 中看似“有违常理”的现象。为了搞清楚这些问题,我们要从 PublisherSubscriber 之间事件的传递方式说起。

之前,实现 SubscriptionBehavior 的时候,接收普通事件的 receive(_ input:) 方法是这样的:

func receive(_ input: Input) -> Subscribers.Demand {
  if demand > 0 {
    let newDemand = downstream.receive(input)
    demand = newDemand + (demand - 1)

    return demand
  }

  return Subscribers.Demand.none
}

也就是说,只有在 demand 大于 0 的时候,Publisher 才会把 input 递交给订阅者,否则就会丢弃 input。通常,在自定义 Subscriber 的时候,receive(_ input:) 最后都会直接返回 .unlimited 表示一直订阅到 Publisher 结束。特别是那些订阅用户交互事件的代码,更是如此。因此,依靠 demand 决定是否递交事件的模式在绝大多数情况下,并不会产生问题。

CustomDemandSink

接下来,为了更具象地观察到 demand 对订阅事件的影响。在项目中,我们创建了一个 CustomDemandSink.swift。在这里定义一个可以动态调整 demand 的订阅者类型:CustomDemandSink。先来看它的属性:

public struct CustomDemandSink<Input, Failure: Error>
  : Subscriber, Cancellable {
  public let combineIdentifier: CombineIdentifier = CombineIdentifier()
  let activeSubscription =
    AtomicBox<(demand: Int, subscription: Subscription?)>((0, nil))
  let value: (Input) -> Void
  let completion: (Subscribers.Completion<Failure>) -> Void

  public init(
    demand: Int,
    receiveValue: @escaping ((Input) -> Void),
    receiveCompletion: @escaping ((Subscribers.Completion<Failure>) -> Void)) {
    activeSubscription.mutate { $0.demand = demand }
    self.value = receiveValue
    self.completion = receiveCompletion
  }
}

其中:

  • combineIdentifier 是来自 Subscriber 的要求。由于 CustomDemandSink 是一个结构,Swift 无法自动为它合成这个属性,因此我们直接用 Combine API 定义了一个;
  • activeSubscription 是一个用 AtomicBox 保护的 tuple。demand 部分表示允许我们动态进行调整的订阅需求,subscription 表示当前订阅的 Publisher
  • valuecompletion 是订阅到普通和完成事件后,调用的方法;

接下来,我们定义一个调整订阅数量的方法 increaseDemand(_ value:),它把 activeSubscription 内部的 subscription 订阅事件的最大值,调整到 value。之所以要有这个方法,是为了稍后方便我们观察 activeSubscription 的订阅情况:

public func increaseDemand(_ value: Int) {
  activeSubscription.mutate {
    $0.subscription?.request(.max(value))
  }
}

然后,是 Cancellable 协议的实现。我们调用 subscriptioncancel 方法取消订阅,并重置 activeSubscription 的状态就好了:

public func cancel() {
  activeSubscription.mutate {
    $0.subscription?.cancel()
    $0 = (0, nil)
  }
}

最后,是 Subscriber 的实现,也就是它约束的三个 receive 方法:

public func receive(subscription: Subscription) {
  activeSubscription.mutate {
    if $0.subscription == nil {
      $0.subscription = subscription

      if $0.demand > 0 {
        $0.demand -= 1
        subscription.request(.max(1))
      }
    }
  }
}

public func receive(_ input: Input) -> Subscribers.Demand {
  value(input)

  var demand = Subscribers.Demand.none
  activeSubscription.mutate {
    if $0.demand > 0 {
      $0.demand -= 1
      demand = .max(1)
    }
  }

  return demand
}

public func receive(completion c: Subscribers.Completion<Failure>) {
  completion(c)
  activeSubscription.mutate {
    $0 = (0, nil)
  }
}

先来看 receive(subscription:)。如果 activeSubscription.subscriptionnil,就把接收到的参数设置成当前的 Subscription 对象,并向这个对象表示的 Publisher 订阅 1 个事件。

再来看 receive(_ input:)。它先调用我们指定的 value 处理事件 input。后半部分,则和 receive(subscription:) 是基本相同的。只是当 activeSubscription.demand 为 0 的时候,它会向上游 Publisher 返回 .none 表示不再接收事件。

最后,是 receive(completion c:)。它先调用我们指定的 completion 方法处理完成事件。然后重置 activeSubscription 的值。至此,这个可以在外部手工调整订阅需求的订阅者就完成了。

编写测试用例

接下来,我们编写一个测试用例,观察下 demand 对订阅事件的影响:

func testDemand() {
  var received = [Subscribers.Event<Int, Never>]()
  let subject = PassthroughSubject<Int, Never>()
  let sink = CustomDemandSink<Int, Never>(
    demand: 2,
    receiveValue: { received.append(.value($0)) },
    receiveCompletion: { received.append(.complete($0)) }
  )

  subject.subscribe(sink)

  subject.send(sequence: 1...3, completion: nil)
  sink.increaseDemand(2)
  subject.send(sequence: 4...6, completion: .finished)
  sink.increaseDemand(2)

  XCTAssertEqual(received, [1, 2, 4, 5].asEvents(completion: .finished))
}

首先,我们创建了一个可订阅 2 个事件的 CustomDemandSink 对象。当 subject 产生 1...3 事件的时候,显然 sink 只应该订阅到 1 和 2,3 会被 subject 丢弃掉。

之后,我们把 sinkdemand 又加了 2。于是,sink 可以订阅到接下来的 4 和 5,6 会被 subject 丢弃掉。因此,在最后的测试条件里,我们判断的事件,就是 [1, 2, 4, 5]

通过这个例子,我们至少已经呈现了上一节提出的一个现象:订阅了一个 subject,并不一定可以收到它接下来的所有事件。

What's next?

看过了这个例子,你可能会觉得,尽管 demand 是一种控制订阅事件数量的机制,但理解它的前前后后要付出的努力,可能真的要比它实际的作用大的多得多。特别是,当事件的发送,和订阅不在同一个线程的时候,这个功能简直就是个 Bug 一样的存在了。下一节,我们就来看下这种情况以及对应的应对方法。

所有订阅均支持 12 期免息分期

¥ 59

按月订阅

一个月,观看并下载所有视频内容。初来泊学,这可能是个最好的开始。

开始订阅

¥ 512

按年订阅

一年的时间,让我们一起疯狂地狩猎知识吧。比按月订阅优惠 28%

开始订阅

¥ 1280

泊学终身会员

永久观看和下载所有泊学网站视频,并赠送 100 元商店优惠券。

我要加入
如需帮助,欢迎通过以下方式联系我们