希望你还记得,之前我们通过测试用例研究 Combine 事件发布订阅模型的时候,创建过一个叫做 Publishers.Scan 的对象对上游事件序列中的事件值进行累计。这一节,我们就通过自定义一个类似的CustomScan,来了解这类类型的工作原理。

自定义 CustomScan

同样作为一个 Publisher,其实实现的过程和上一节的 CustomSubject 是一样的。首先,根据它的订阅者接收事件的方式定义一个实现了 SubscriptionBehavior 的类型;其次,根据这个类型,实现 Publisher 约束的 receive<S>(subscriber: S) 就好了。

为此,在项目中,我们创建了一个 CustomScan.swift,在其中添加 CustomScan 的实现。先来看 CustomScan 的定义:

public class CustomScan<Upstream: Publisher, Output>: Publisher {
  public typealias Failure = Upstream.Failure
  let reducer: (Output, Upstream.Output) -> Output
  let upstream: Upstream
  let initialState: Output

  public init(upstream: Upstream,
    initialResult: Output,
    nextPartialResult: @escaping (Output, Upstream.Output) -> Output) {
    self.initialState = initialResult
    self.reducer = nextPartialResult
    self.upstream = upstream
  }
}

其中,泛型参数 Upstream 表示上游事件发布者的类型,Output 表示 CustomScan 自身转换出来的新事件类型。由于 CustomScan 不转换错误事件的类型,所以我们一开始,让 Failure 就是上游事件序列中的错误类型。接下来:

  • reducer 是累计事件值的计算方法;
  • upstream 是上游事件序列的引用,由于 CustomScan 要依赖于上游事件进行计算,因此这个属性不是一个 optional;
  • initialState 是计算的初始值;

在 memberwise init 方法里,逐个对这些属性初始化就好了。

自定义 SubscriptionBehavior

接下来,和上一节创建 CustomSubject 一样,我们也要给 CustomScan 自定义一个 CustomBehavior 对象。因为我们要在接收到上游事件之后,进行累加计算。还是先来看它的定义:

public class CustomScan<Upstream: Publisher, Output>: Publisher {
  class Behavior: SubscriptionBehavior {
    typealias Input = Upstream.Output
    var upstream: Subscription? = nil
    let downstream: AnySubscriber<Output, Upstream.Failure>
    var demand: Subscribers.Demand = .none
    let reducer: (Output, Upstream.Output) -> Output
    var state: Output

    init(downstream: AnySubscriber<Output, Upstream.Failure>,
      reducer: @escaping (Output, Upstream.Output) -> Output,
      initialState: Output) {
      self.downstream = downstream
      self.reducer = reducer
      self.state = initialState
    }

    func receive(_ input: Upstream.Output) -> Subscribers.Demand {
      state = reducer(state, input)
      return downstream.receive(state)
    }
  }
}

可以看到,除了“沿袭”自 SubscriptionBehavior 的内容之外,Behavior 还持有了 CustomScan 的计算方法和初始状态,通过这个定义,我们可以进一步领会在 Publisher 背后,Subscription 对象的功能。在我们的实现里,上游事件值的累计,是在递交给订阅者的时候,计算出来的。

实现 Publisher 约束的方法

有了 Behavior 之后,就可以实现 CustomScan 的剩余方法了。也就是 Publisher 约束的 receive<S>(subscriber:)

public func receive<S>(subscriber: S)
  where S: Subscriber, Failure == S.Failure, Output == S.Input {
  let downstream = AnySubscriber(subscriber)
  let behavior = Behavior(
    downstream: downstream, reducer: reducer, initialState: initialState)
  let subscription = CustomSubscription(behavior: behavior)
  upstream.subscribe(subscription)
}

这里,和 CustomSubject 唯一不同的就是,我们把包含了 CustomScan 事件转换逻辑的 Subscription 对象作为一个订阅者,订阅的上游事件发布者。每一个中间环节都像这样订阅订阅上游发布者,整个事件链就通过 Subscription 连接起来了。

通过单元测试验证模型

说到这里,用于验证 Combine 事件发布订阅模型需要的核心组件,就都完成了。
为了验证这个模型,我们基于之前写过的 testDeferredSubjects,创建一个新的测试用例:

func testCustomSubjects() {
  var subjects = [CustomSubject<Int, Never>]()
  let deferred = Deferred { () -> CustomSubject<Int, Never> in
    let request = CustomSubject<Int, Never>()
    subjects.append(request)

    return request
  }

  let scanB = CustomScan(
    upstream: deferred,
    initialResult: 10,
    nextPartialResult: +)

  var receivedC = [Subscribers.Event<Int, Never>]()
  let sinkC = Subscribers.Sink<Int, Never>(
    receiveCompletion: {
      receivedC.append(.complete($0))
    },
    receiveValue: {
      receivedC.append(.value($0))
    })

  var receivedD = [Subscribers.Event<Int, Never>]()
  let sinkD = Subscribers.Sink<Int, Never>(
    receiveCompletion: {
      receivedD.append(.complete($0))
    },
    receiveValue: {
      receivedD.append(.value($0))
    })

  scanB.subscribe(sinkC)
  subjects[0].send(sequence: 1...2, completion: nil)

  scanB.subscribe(sinkD) /// `sinkD` does not receive events in `subjects[0]`
  subjects[0].send(sequence: 3...4, completion: .finished)
  subjects[1].send(sequence: 1...4, completion: .finished)

  XCTAssertEqual(receivedC, [11, 13, 16, 20].asEvents(completion: .finished))
  XCTAssertEqual(receivedD, [11, 13, 16, 20].asEvents(completion: .finished))
}

这次,我们让 deferred 包含了两个 CustomSubject 对象。其余的部分,和 testDeferredSubjects 是完全相同的,如果一切顺利,测试的结果也应该和使用 PassthroughSubject 是一样的。不过,在开始自行测试之前,先来想个问题:为什么这个测试用例可以验证我们对模型的推测呢?

首先,CustomSubject 作为一个 Publisher,我们定义了它的 receive(subscriber:) 方法。但 sinkCsinkD 订阅它的时候,subscribe 方法仍旧是 Combine 提供的,如果订阅之后 receive(subscriber:) 被调用了,就验证了这个方法被调用的时机;

其次,在 receive(subscriber:) 的实现里,receive(subscription: Subscriptions.empty) 应该会导致封装着原始订阅者的 Subscription 中的 request 方法被调用,也就是我们在 CustomSubscription 中定义的 request(_ demand:) 方法;

最后,CustomSubscription 是一个值类型,这也就意味着在 receive<S>(subscriber:) 方法里,每个订阅者获得的,都是一份属于它自己的 CustomSubscription 对象,因此,sinkCsinkD 订阅到的结果,也应该是独立的;

把这些问题想清楚之后,验证是非常简单的,执行一下这个测试用例就好了,如果一切顺利,它应该是可以 Pass 的:

What's next?

以上,就是对 Combine 基础工作原理方面的研究,现在,对于 Combine 中的事件是如何发送的,如何订阅的,以及 PublisherSubscriberSubscription 这三个角色在整个模型中的角色,我们应该了解的比较清楚了。

接下来,要探索的,是关于“共享”的话题。就像之前在测试用例分析中说到的,每个订阅者独占的 Subscription 对象会导致事件在传递到不同订阅者的时候发生重复计算。但有些时候,我们并不希望如此。如何避免这种问题呢?如何实现类似“一发多收”的效果呢?如何能缓存要下发的事件呢?在 RxSwift 里,我们可以用 ConnectableObservable<E>。那么,在 Combine 里,如何实现类似的“共享”功能呢?

所有订阅均支持 12 期免息分期

¥ 59

按月订阅

一个月,观看并下载所有视频内容。初来泊学,这可能是个最好的开始。

开始订阅

¥ 512

按年订阅

一年的时间,让我们一起疯狂地狩猎知识吧。比按月订阅优惠 28%

开始订阅

¥ 1280

泊学终身会员

永久观看和下载所有泊学网站视频,并赠送 100 元商店优惠券。

我要加入
如需帮助,欢迎通过以下方式联系我们