在开始接下来的泊学 App 开发之前,我决定先插一段关于 Combine 的研究。对于我们绝大多数人而言,可能都是在遵循着 RxSwift 的经验在理解和使用 Combine。但通过修改泊学 App 的一个 bug,让我发现了当前版本的 Combine 和 RxSwift 一些明显的不同。由于接下来我们会频繁和 Combine 打交道,因此还是非常有必要深入了解下这个 Apple 新发布的响应式编程框架。

Combine 中的发布者和订阅者

而我们的探索过程,就从理解事件发布和订阅,这个最基本的模型开始。在 Combine 里,表达事件发布概念的是一个叫做 Publisher 的协议,经验上,这应该就是一个可以用不同方式不断产生值的对象。但事实上,并不如此。因为它并没有约束任何和“发送事件”这个概念有关的接口。它唯一约束的,就是一个接受“订阅者”的方法:

protocol Publisher {
  associatedtype Output
  associatedtype Failure : Error

  func receive<S>(subscriber: S) where
    S : Subscriber,
    Self.Failure == S.Failure, Self.Output == S.Input
}

这是 Combine 中第一个和我们响应式编程过往经验不同的地方,Publisher 仅仅是某种可以接受订阅者的对象。那么,在 Publisher 的定义里,那个表达订阅者的 Subscriber 又是什么呢?它约束了一组接收各种不同类型事件的接口:

protocol Subscriber : CustomCombineIdentifierConvertible {
  associatedtype Input
  associatedtype Failure : Error

  func receive(_ input: Self.Input) -> Subscribers.Demand
  func receive(completion: Subscribers.Completion<Self.Failure>)
  func receive(subscription: Subscription)
}

从功能上说,receive(_ input:) 负责从 Publisher 接收事件值。receive(completion:) 负责从 Publisher 接收完成事件。至于 receive(subscription:) 我们一会儿再说。

看到这,我们不难猜想,既然 Publisher 只有接受订阅者的接口,而 Subscriber 又约束了接收事件的接口,Combine 中的事件流最有可能的实现方式,就是发布者直接以“点对点”的形式把事件发送到订阅者。而这个过程,应该和最后的 receive(subscription:) 方法有着密切的联系。

观察 Combine 的事件订阅模型

了解了这两个类型之后,我们再从订阅 Publisher 的结果进一步观察下 Combine 中发布和订阅模型。

准备工作

首先,做一些准备工作。稍后,我们会自己实现一些概念版的 Combine 基础组件。为此,我们可以创建一个 Mac OS framework 项目,叫做 CustomCombine,并让它包含单元测试。在这个项目里,添加一个 Subject+Send.swift,在这里给 Subject 添加一个扩展,它可以把一个序列的元素,变成 Subject 发送的一系列事件。这个方法主要用于方便我们在单元测试中生成测试事件:

public extension Subject {
  func send<S: Sequence>(
    sequence: S,
    completion: Subscribers.Completion<Self.Failure>? = nil)
    where S.Element == Self.Output {
    for v in sequence {
      send(v)
    }

    if completion != nil {
      send(completion: completion!)
    }
  }
}

接下来,再创建一个 Subscribers+Event.swift。在这里,添加一个 enum Event,它是 Combine 中“事件”这个概念的封装,把订阅者可能接收的事件,统一成了一个类型。之所以要这样做,也是为了稍后在测试用例中,方便观察和比较订阅者接收到的事件。

public extension Subscribers {
  enum Event<Value, Failure: Error> {
    case value(Value)
    case complete(Subscribers.Completion<Failure>)
  }
}

extension Subscribers.Event: Equatable
  where Value: Equatable, Failure: Equatable {}

然后,在这个文件里,给 Sequence 添加一个扩展方法 asEvents,让它可以把 Sequence 中的元素类型,转换成 Event 数组,我们用这个方法,生成单元测试的期望结果:

public extension Sequence {
  func asEvents<Failure>(
    failure: Failure.Type,
    completion: Subscribers.Completion<Failure>? = nil)
    -> [Subscribers.Event<Element, Failure>] {
    let values = map(Subscribers.Event<Element, Failure>.value)
    guard let completion = completion else { return values }
    return values + [Subscribers.Event<Element, Failure>.complete(completion)]
  }
}

这里,一个很有意思的用法就是,我们可以把 enum 中带有关联值的 case 直接作为参数传递给 map,编译器会把这个 case 认为是 (Value) -> EnumType 这种类型的函数,在我们的例子中,也就是 (Value) -> Event<Value, Failure>。有了这个方法之后,再重载一个错误事件是 .Never 的版本。因为在单元测试中,我们主要观察的是整个事件流,而错误的类型通常并不重要。

func asEvents(completion: Subscribers.Completion<Never>? = nil)
  -> [Subscribers.Event<Element, Never>] {
  return asEvents(failure: Never.self, completion: completion)
}

有了这些方法之后,为这一节内容做的准备工作就完成了。

观察一个最基础的发布订阅

接下来,我们从观察一个最基本的发布订阅模型开始:

为了表达这个关系,在 CustomCombineTest.swift 里,创建一个测试用例:

func testSubjectSink() {
  let subject = PassthroughSubject<Int, Never>()
  var received = [Subscribers.Event<Int, Never>]()
  let sink = Subscribers.Sink<Int, Never>(
    receiveCompletion: {
      received.append(.complete($0))
    },
    receiveValue: {
      received.append(.value($0))
    })

  subject.subscribe(sink)
  subject.send(sequence: 1...3, completion: .finished)

  XCTAssertEqual(received, (1...3).asEvents(completion: .finished))
}

这里,事件发布者是 subject,订阅者是 sink。在 subject.subscribe(sink) 之后,我们通过 subject 发布了 1,2,3,.finished 事件(这里用的就是之前给 Sequence 添加的扩展方法)。显然,sink 也应该全数收到这些事件,而这,就是我们在 XCTAssertEqual 中判断的依据。

并且,即便我们像下面这样修改上游事件流,整个事件的发布订阅模型也不会受到影响:

这种关系,在 Combine 里可以用一个 transformer 表示:

func testScan() {
  let subjectA = PassthroughSubject<Int, Never>()
  let scanB = Publishers.Scan(
    upstream: subjectA,
    initialResult: 10, nextPartialResult: +)

  var received = [Subscribers.Event<Int, Never>]()
  let sink = Subscribers.Sink<Int, Never>(
    receiveCompletion: {
      received.append(.complete($0))
    },
    receiveValue: {
      received.append(.value($0))
    })

  scanB.subscribe(sink)
  subjectA.send(sequence: 1...3, completion: .finished)

  XCTAssertEqual(received, [11, 13, 16].asEvents(completion: .finished))
}

这里,我们用 Scan 加工了原始数据流,并订阅了加工后的结果。当 subjectA 生成了原始事件之后,我们还是可以全数接受到所有加工后的事件。

至此,“订阅了一个 Publisher 之后,就可以订阅到它的所有事件”这个结论,仍旧是成立的。

半路杀出来个订阅者又如何呢

接下来,我们把订阅模型变成下面这样,在 Pub1 的生命周期里,又多出来一个订阅者会如何呢?

对于这种情况,Sub2 可以订阅到的内容,响应式框架通常会给我们一些选择,例如:

  • 订阅到所有缓存的历史事件;
  • 订阅到最近的一次事件;
  • 只订阅下一次生成的事件;
  • 重新触发整个事件订阅流程;

在 Combine 里,默认使用的就是最后一种方式,看似也最不容易理解。我们用下面这个测试用例表达这个模型:

func testSequenceABCD() {
  let subjectA = PassthroughSubject<Int, Never>()
  let scanB = Publishers.Scan(
    upstream: subjectA, initialResult: 10, nextPartialResult: +)

  var receivedC = [Subscribers.Event<Int, Never>]()
  let sinkC = Subscribers.Sink<Int, Never>(
    receiveCompletion: {
      receivedC.append(.complete($0))
    },
    receiveValue: {
      receivedC.append(.value($0))
    })

  var receivedD = [Subscribers.Event<Int, Never>]()
  let sinkD = Subscribers.Sink<Int, Never>(
    receiveCompletion: {
      receivedD.append(.complete($0))
    },
    receiveValue: {
      receivedD.append(.value($0))
    })

  scanB.subscribe(sinkC)
  scanB.subscribe(sinkD)
  subjectA.send(sequence: 1...3, completion: .finished)

  XCTAssertEqual(receivedC, [11, 13, 16].asEvents(completion: .finished))
  XCTAssertEqual(receivedD, [11, 13, 16].asEvents(completion: .finished))
}

这次,我们给 scanB 添加了两个订阅者 sinkCsinkD。所谓“重新触发整个事件订阅流程”就是指,当 sinkD 订阅 scanB 时,会驱使 scanB 重新对 subjectA 中的所有元素进行一次计算,并把结果发送到 sinkD,而不是 sinkC 消费了 scanB 的所有事件之后,sinkD 就订阅不到事件了。

因此,“订阅了一个 Publisher 之后,就可以订阅到它的所有事件”这个结论,仍旧是成立的。

考虑上时间维度之后呢

当然你可能会觉得,上面的例子不太有说服力,毕竟,sinkCsinkD 是处在同一个时间点订阅的。因此,我们很难观察到所谓“半路杀出来的订阅者”真实的订阅行为。

emm...没错,我完全认同。而当我们把这两个订阅的时间错开,得到的结论,也让我们可以发现一些有意思的结论。为了演示这个过程,我们创建一个 Deferred 对象,并用 Scan 加工其中的事件:

func testDeferredSubjects() {
  var subjects = [PassthroughSubject<Int, Never>]()
  let deferred = Deferred { () -> PassthroughSubject<Int, Never> in
    let request = PassthroughSubject<Int, Never>()
    subjects.append(request)

    return request
  }

  let scanB = Publishers.Scan(
    upstream: deferred, initialResult: 10, nextPartialResult: +)
}

这里,我们用 subjects 保存了 Deferred 中生成的 PassthroughSubject 对象的引用。这是为了稍后方便在其中生成事件。接下来,还是创建两个订阅者 sinkCsinkD,这部分和上个测试是一样的:

func testDeferredSubjects() {
  /// ...

  var receivedC = [Subscribers.Event<Int, Never>]()
  let sinkC = Subscribers.Sink<Int, Never>(
    receiveCompletion: {
      receivedC.append(.complete($0))
    },
    receiveValue: {
      receivedC.append(.value($0))
    })

  var receivedD = [Subscribers.Event<Int, Never>]()
  let sinkD = Subscribers.Sink<Int, Never>(
    receiveCompletion: {
      receivedD.append(.complete($0))
    },
    receiveValue: {
      receivedD.append(.value($0))
    })
}

最后,我们先让 sinkC 订阅 scanB,并让 scanB 开始生成事件。然后,再让 sinkD 也加入订阅,并通过 subject[0]subject[1] 继续让 scanB 生成事件:

scanB.subscribe(sinkC)
subjects[0].send(sequence: 1...2, completion: nil)

scanB.subscribe(sinkD)
subjects[0].send(sequence: 3...4, completion: .finished)
subjects[1].send(sequence: 1...4, completion: .finished)

现在,sinkD 订阅了 scanB 之后,它会订阅到通过 subjects[0] 生成的后续事件么?说到这,事情就有点儿矛盾了。按照我们之前说过的“订阅了一个 Publisher 之后,就可以订阅到它的所有事件”这个结论,显然 sinkD 应该可以订阅到 subjects[0] 生成的后续 16,20,.finsihed 事件。但如果这样,它还会订阅到通过 subjects[1] 生成的 11,13,16,20,.finished 事件,一个订阅者可以订阅到两次 .finished 事件显然是不正确的。

而正确的结果是,sinkD 只能订阅到 subject[1] 通过 scanB 生成的事件:

XCTAssertEqual(receivedC, [11, 13, 16, 20].asEvents(completion: .finished))
XCTAssertEqual(receivedD, [11, 13, 16, 20].asEvents(completion: .finished))

What's next?

当然,我们的确是通过一些小伎俩刻意放大了我们想达到的效果,即:订阅同一个 Publisher 不一定可以订阅到它后续的所有事件。而我们之前用图片表达的发布者和订阅者的关系,也不一定真实的反应了事件的传递路径,在它们背后,还有着我们没有发觉的内容。而继续探寻的线索,就是 Subscriber 协议中的 func receive(subscription: Subscription) 方法。除了 PublisherSubscriber 之外,这个 Subscription 是做什么的呢?

© Boxue 2020. All rights reserved.
Designed with by 10 and 11. Proudly hosted with Aliyun
所有订阅均支持 12 期免息分期

¥ 59

按月订阅

一个月,观看并下载所有视频内容。初来泊学,这可能是个最好的开始。

开始订阅

¥ 512

按年订阅

一年的时间,让我们一起疯狂地狩猎知识吧。比按月订阅优惠 28%

开始订阅

¥ 1280

泊学终身会员

永久观看和下载所有泊学网站视频,并赠送 100 元商店优惠券。

我要加入
如需帮助,欢迎通过以下方式联系我们