这一节,我们继续之前的话题。Subscription 究竟是什么呢?它又是如何影响 Combine 的事件发布订阅模型的呢?为了搞清楚这个问题,我们可以从两个方向入手。

  • 首先,通过文档和已经公开的代码,整理和补充 Subscription 在事件发布订阅模型中承担的功能,当然这里也需要有一些脑补的环节;
  • 其次,用代码自定义一个 Subscription 对象并把它用于 Combine 的单元测试,验证我们的理解;

一个完整的订阅发布模型

如果直接去翻翻 Subscription 的代码,就会发现,它只约束了一个 request 方法:

public protocol Subscription
  : Cancellable, CustomCombineIdentifierConvertible {
    /// Tells a publisher that it may send more values to the subscriber.
    func request(_ demand: Subscribers.Demand)
}

再想想我们上一节整理出来的内容,Subscriber 约束了一个接受 Subscription 参数的 receive 方法,而 Subscription 只约束了一个接受事件订阅数量的 request 方法。它们到底在事件发布订阅模型中,承担了什么角色呢?

为了搞清楚这个问题,在不考虑额外调度的前提下,我们先把一个完整的事件发布订阅模型陈列出来:

  • 首先,订阅者调用发布者的 subscribe 方法,把自己作为参数传递给发布者表示要订阅事件;
  • 其次,在 subscribe 方法的实现里,会调用 Publisher 约束的 receive<S>(subscriber: S) 方法处理传入的订阅者;
  • 第三,在 receive 的实现里,会把传入的订阅者封装成一个 Subscription 对象,这个对象会持有原始订阅者的引用;
  • 第四,如果被订阅的发布者存在上游发布者,它就会把封装的 Subscription 作为订阅者,继续调用上游发布者的 subscribe 方法。为了让这一步成立,Subscription 也应该是一个实现了 Subscriber 的类型;
  • 第五,上游发布者会把下游传入的 Subscription 对象进一步封装成表示订阅其自身的 Subscription 对象,并让它持有指向传入的 Subscription 对象的引用;
  • 第六,事件流源头的发布者,会调用下级订阅者的 receive<S>(subscription:) 方法,传入它封装好的 Subscription 对象;

至此,整个发布订阅模型的前半部分就说完了。在整个事件流发布和订阅的链条里,只有最末端的订阅者,是通过 Subscriber 表示的,而其余的环节里,维持链条完整的“中间订阅者”都是通过 Subscription 表示的。尽管从公开的代码和文档中没有并没有说明 Subscription 是一个 Subscriber,仅仅说了 Subscription is a protocol representing the connection of a subscriber to a publisher。但从要实现的结果,以及 PublisherSubscriber 各自的接口声明来看,要想维系这种关系,Subscription 的确需要有 Subscriber 的行为特征才行。

为了进一步巩固这个想法,我们继续来看发布订阅模型的后半部分,把这部分用一张图表示,是这样的:

这次,我们仅仅列出了原始的订阅者,直接订阅的 Publisher 封装的 Subscription,以及上游 Publisher 封装的 Subscription。这个过程紧接者刚才说过的“第六”。

  • 第七,上游 Publisher 封装了自己的 Subscription 对象之后,把下游 Publisher 封装的 Subscription 对象当成了一个订阅者,调用了它的 receive<S>(subscription:) 方法;
  • 第八,下游 Publisher 再调用原始订阅者的 receive<S>(subscription:),传递它自己的 Subscription 对象;
  • 第九,在原始订阅者的 receive<S>(subscription:) 方法里,调用传入的 Subscription 对象的 request(_ demand:) 方法,表明自己期望订阅到的事件数量;
  • 最后,Publisher 继续调用自己上游 Publisher 传入的 Subscriptionrequest(_ demand:) 方法,这样在整条事件传递链条上,最终到订阅者的,就只有特定数量的事件了;

以上,就是一个完整的 Combine 事件发送订阅模型。通过第二个图片可以看出,在 Combine 里,通过代码呈现的发布者和订阅者关系的背后,真正发挥作用的,其实是一个 Subscription 链条。说到这,也就不难理解为什么上一节最后一个测试用例中,sinkCsinkD 不能共享 scanB 中的事件了,因为对于这两个订阅者来说,scanB 的上游事件并不相同,把它们的订阅关系转换成 Subscription 对象的关系是这样的:

自定义 Subscription 实现

看到这,你可能会想,虽然从道理上整个逻辑能说得通,但 Combine 中真就如此么?毕竟这里有我们根据文档和方法签名推测出来的东西。而验证这个推测最好的方式,就是自定义一个 PublisherSubscription 对象,并把它们用在上一节的测试用例中,如果可以通过测试,关于整个模型的推测就得到印证了。

定义 Subscription “应有”的行为

首先,我们要自定义一个协议,让它拥有之前我们推测的各种行为。为此,在上一节创建的 framework 项目里,新建个 CustomSubscription.swift。在这里,按照我们对 Subscription “需求”定义一个协议:

public protocol SubscriptionBehavior
  : class, Cancellable, CustomCombineIdentifierConvertible {
  associatedtype Input
  associatedtype Failure: Error
  associatedtype Output
  associatedtype OutputFailure: Error

  var demand: Subscribers.Demand { get set }
  var upstream: Subscription? { get set }
  var downstream: AnySubscriber<Output, OutputFailure> { get }

  func request(_ d: Subscribers.Demand)
  func receive(_ input: Input) -> Subscribers.Demand
  func receive(completion: Subscribers.Completion<Failure>)
}

这里,之所以让 SubscriptionBehavior 是一个 class 才能遵守的协议,是因为 Swift 会自动为类生成 CustomCombineIdentifierConvertible 要求的 combineIdentifier 属性,省得我们自己去实现它了,用起来方便一些。

其次,来看关联类型和属性的部分:

  • InputFailure 是上游 Publisher 中的普通和错误事件类型;
  • OutputOutputFailure 是下游 Subscriber 的普通和错误事件类型;
  • demand 是下游 Subscriber 向上传递的要订阅的事件数量;
  • upstream 是上游 Publisher 下发的 Subscription 对象,由于不一定存在上游 Publisher,因此这是一个 optional;
  • downstream 是下游订阅者的引用;

最后,是接口的部分:

  • request 就是 Combine 中 Subscription 约束的 request 接口,它收集订阅者期望订阅的事件数量;
  • 剩下的两个 receive 方法,让 SubscriptionBehavior 具有 Subscriber 的行为特征,既可以让它接收到 Publisher 的普通和完成事件;

以上,就是我们推测的 Subscription 应该具备的全部行为特征了。然后,根据之前整理的事件发布订阅模型中 Subscription 的功能,我们来给这些接口添加默认实现。

首先是 request,它累计下游订阅者的订阅需求,并把这个需求传到上游发布者:

public extension SubscriptionBehavior {
  func request(_ d: Subscribers.Demand) {
    demand += d
    upstream?.request(demand)
  }
}

其次,是 Cancellable 约束的 cancel 方法,我们这个用于演示的 SubscriptionBehavior 无需做额外的操作,因此直接通知上游发布者就好了:

public extension SubscriptionBehavior {
  func cancel() {
    upstream?.cancel()
  }
}

第三,是把正常事件转发到订阅者的 receive 方法。它执行的逻辑就是 demand 有配额就调用订阅者的 receive 方法,并更新 demand,这里要特别注意 demand 的计算方法。它是和下游订阅者的 receive 返回值累计的。否则,就返回 Demand.none

public extension SubscriptionBehavior
  where Input == Output, Failure == OutputFailure {
  func receive(_ input: Input) -> Subscribers.Demand {
    if demand > 0 {
      let newDemand = downstream.receive(input)
      demand = newDemand + (demand - 1)

      return demand
    }

    return Subscribers.Demand.none
  }
}

最后,是接受完成事件的 receive 方法。由于我们不需要做额外的处理,这次,就直接把事件转发给下游订阅者就好了:

public extension SubscriptionBehavior
  where Failure == OutputFailure {
  func receive(completion: Subscribers.Completion<Failure>) {
    downstream.receive(completion: completion)
  }
}

至此,按照我们的推断,一个 Subscription 对象应有的行为,就都实现完了。这里,我们只是让 SubscriptionBehavior 看上去像一个 Subscription 而已,并没有给它一个正式的“名分”,至于为什么这样做,接着往下看就好了。

自定义 Subscription

接下来,利用 SubscriptionBehavior,我们自定义一个实现了 Subscription 的类型。为了线程安全,我们把 SubscriptionBehavior 的所有操作,都用一个互斥锁保护起来。还是在 CustomSubscription.swift 里,先来看这个类型的定义:

public struct CustomSubscription<Content: SubscriptionBehavior>
  : Subscriber, Subscription {
  public typealias Input = Content.Input
  public typealias Failure = Content.Failure

  public var combineIdentifier: CombineIdentifier {
    return content.combineIdentifier
  }

  let recursiveMutext = NSRecursiveLock()
  let content: Content

  init(behavior: Content) {
    self.content = behavior
  }
}

按照之前的推测,一个实现了 Subscription 的类型也应该是一个实现了 Subscriber 的类型。因此,CustomSubscription 是在我们的框架中,有正式名分的 Subscription 对象。而之所以要把 SubscriptionBehavior 放在泛型参数里,是为了让它专注于处理事件发布和订阅的具体逻辑。而 CustomSubscription 则专注于让这些方法保持线程安全性。如果你还有点迷糊,没关系,继续往下看就好了。

首先,我们来实现 CustomSubscriptionSubscription 的部分:

public func request(_ demand: Subscribers.Demand) {
  recursiveMutex.lock()
  defer { recursiveMutex.unlock() }
  content.request(demand)
}

public func cancel() {
  recursiveMutex.lock()
  content.cancel()
  recursiveMutex.unlock()
}

看到了吧,这就是我们说过的区分开执行逻辑和线程安全性的方式。接下来,我们再来实现 Subscriber 的部分。

public func receive(subscription upstream: Subscription) {
  recursiveMutext.lock()
  defer { recursiveMutext.unlock() }

  content.upstream = upstream
  content.downstream.receive(subscription: self)
}

通过这个 receive(subscription:) 的实现,我们就知道 Subscription 是如何持有上级 Subscription 引用,以及如何把自身对象传到下级订阅者的了。

最后,是接受普通和完成事件的 receive 方法。它们的实现都很简单,就是在 recursiveMutex 的保护下接收事件而已:

public func receive(_ input: Input) -> Subscribers.Demand {
  recursiveMutex.lock()
  defer { recursiveMutex.unlock() }
  return content.receive(input)
}

public func receive(completion: Subscribers.Completion<Failure>) {
  recursiveMutex.lock()
  defer { recursiveMutex.unlock() }
  content.receive(completion: completion)
}

What's next?

至此,按照我们的推测,一个实现了 Subscription 的类型该有的样子,就完成了。在继续之前,回顾下我们对 Combine 事件发布订阅模型的描述,以及 CustomSubscription 是如何与这个模型中对应的环节关联起来的。如果你确定已经没问题了,就可以进入下一节的内容了。

所有订阅均支持 12 期免息分期

¥ 59

按月订阅

一个月,观看并下载所有视频内容。初来泊学,这可能是个最好的开始。

开始订阅

¥ 512

按年订阅

一年的时间,让我们一起疯狂地狩猎知识吧。比按月订阅优惠 28%

开始订阅

¥ 1280

泊学终身会员

永久观看和下载所有泊学网站视频,并赠送 100 元商店优惠券。

我要加入
如需帮助,欢迎通过以下方式联系我们