接着上一节的话题,为了让事件的生成和订阅发生在不同的线程里,我们先在测试用例中使用 receive(on:),把订阅代码“调度”走:

func testReceiveOnImmediate() {
  let e = expectation(description: "")
  let subject = PassthroughSubject<Int, Never>()
  var received = [Subscribers.Event<Int, Never>]()

  let sink = subject.receive(on: ImmediateScheduler.shared)
    .sink(receiveCompletion: {
      received.append(.complete($0))
      e.fulfill()
    }, receiveValue: {
      received.append(.value($0))
    })

  subject.send(1)
  subject.send(completion: .finished)

  wait(for: [e], timeout: 5.0)

  XCTAssertEqual(received, [1].asEvents(completion: .finished))

  sink.cancel()
}

现在,事件的发送在测试线程里(也就是主线程),事件的接收在 ImmediateScheduler 指定的线程里(在这个例子里,其实仍旧是主线程,我们只是从一个最简单的调度开始,稍后就会看到真正调度到其它线程的情况)。为了观察事件的订阅,我们使用了 expectation,并在订阅到完成事件时,把它设置成 fulfill 状态。最后,使用 wait 等待 expectation 被满足。这样一来,received 中应该可以收集到来自主线程的事件 1 和 .finsihed

当然,这个测试用例更多地是在从想法上表达如何测试事件发送和订阅在不同线程中的情况。实际上,testReceiveOnImmediate 中没有任何异步执行的环节,整个过程的执行,仍旧是同步的。

一个会丢事件的订阅场景

接下来,我们把订阅代码真正调度到一个后台线程,来继续观察 received 的值:

func testReceiveOnFailure() {
  let queue = DispatchQueue(label: "test")
  let e = expectation(description: "")
  let subject = PassthroughSubject<Int, Never>()
  var received = [Subscribers.Event<Int, Never>]()
  print(Thread.current)
  let sink = subject.receive(on: queue)
    .sink(receiveCompletion: {
      print(Thread.current)
      received.append(.complete($0))
      e.fulfill()
    }, receiveValue: {
      received.append(.value($0))
    })

  subject.send(1)
  subject.send(completion: .finished)

  wait(for: [e], timeout: 5.0)
  // The following assert will FAIL!
  XCTAssertEqual(received, [1].asEvents(completion: .finished))

  sink.cancel()
}

这里,唯一的变化,就是我们把创建订阅者的代码调度到了 queue。但让人诧异的是,上面这个测试用例不仅无法通过测试,如果你尝试多执行几次,就连 received 中的值都可能发生变化。它有时可以收集到 .finished 事件,有时又是一个空数组。

如果我们把发送 .finished 事件的代码也调度到 queue

queue.async { subject.send(completion: .finished) }

received 中就可以稳定地收到 .finsihed 事件,但无论如何,它始终都无法接收到 subject 发送的事件 1。这就再一次印证了我们之前说过的一个现象:订阅了一个 Publisher 之后,不一定可以收到它的所有事件。事件有可能会被漏掉,在绝大多数情况下,这可绝对不是我们期望的结果。

为什么会如此呢?

一个便于观察 Publisher 的方法

根本的原因,就是当事件采用异步接收的模式时,demand 也会通过调度一个专门的 request(_ demand:) 方法来传递。而执行 subject.send(1) 的时候,传递 demand 的方法还没被调度执行,因此事件 1 就丢了。为了更好地观察这个现象,我们先创建一个辅助方法 debug

public extension Publisher {
  func debug(
    prefix: String = "",
    function: String = #function,
    line: Int = #line) -> Publishers.HandleEvents<Self> {
    let pattern =
      "\(prefix + (prefix.isEmpty ? "" : " "))\(function), line \(line)"

    return handleEvents(receiveSubscription: {
      Swift.print("\(pattern)subscription \($0)")
    }, receiveOutput: {
      Swift.print("\(pattern)output \($0)")
    }, receiveCompletion: {
      Swift.print("\(pattern)completion \($0)")
    }, receiveCancel: {
      Swift.print("\(pattern)cancelled")
    }, receiveRequest: {
      Swift.print("\(pattern)request \($0)")
    })
  }
}

它的作用,就是在 Publisher 发生各种事件的时候,在控制台打印一段文字。其中:

  • prefix 是打印信息的前缀,如果不为空,就在 prefix 后面追加一个空格便于识别;
  • functionline,它们使用了 Swift 内置的宏 #function#line,分别表示了调用 debug 的函数名称和所在行数;

最后,debug 返回了一个 HandleEvents 对象,它允许我们为 Publisher 可能发生的各种事件,注册一个 closure。这个过程是通过 handleEvents 方法完成的,它的 5 个参数,分别表示 Publisher 收到来自上游的 Subscription 对象、发生普通事件、发生完成事件、取消订阅以及收到更新 demand 请求,这 5 种不同的事件。当这些事件发生的时候,我们在控制台打印了对应的消息。

重新观察之前的丢消息场景

有了 debug 之后,我们改造下之前的测试用例,给 subject 挂上 debug 方法,重新观察下丢消息的场景:

func testReceiveWithDebug() {
  let subject = PassthroughSubject<Int, Never>()
  var received = [Subscribers.Event<Int, Never>]()

  print("Start...")
  let cancellable = subject
    .debug()
    .receive(on: DispatchQueue.main)
    .sink(receiveValue: { received.append(.value($0)) })

  print("Phase 1...")
  subject.send(1)
  XCTAssertEqual(received, [].asEvents(completion: nil))

  print("Phase 2...")
  RunLoop.current.run(until: Date(timeIntervalSinceNow: 0.001))
  XCTAssertEqual(received, [].asEvents(completion: nil))

  print("Phase 3...")
  subject.send(2)
  XCTAssertEqual(received, [].asEvents(completion: nil))

  print("Phase 4...")
  RunLoop.current.run(until: Date(timeIntervalSinceNow: 0.001))
  XCTAssertEqual(received, [2].asEvents(completion: nil))

  cancellable.cancel()
}

这个观察的过程,分成 4 个阶段:

  • 首先,subject 生成事件 1 的时候,和之前一样,这个事件会丢掉,received 中仍旧是空的;
  • 其次,我们启动了主线程中的 RunLoop,这时,被调度的订阅代码才会得以执行,就像之前说过的,一个专门的 request(_ demand:) 会被执行,向 subject 发送 .unlimited 订阅请求。但此时,received 仍旧是空的;
  • 第三,subject 继续生成了事件 2,此时,receiveValue 指定的 closure 会被放入 RunLoop 等待调度;
  • 第四,只要重新让 RunLoop 执行一下,就可以订阅到第三步的事件了;
  • 最后,cancel 方法会取消 subjectdebug 应该也可以捕获到这个事件;

有了这番分析之后,我们只要执行这个测试用例,就会在控制台看到类似这样的结果:

Phase 1...
Phase 2...
testReceiveWithDebug(), line 502: request unlimited
Phase 3...
testReceiveWithDebug(), line 502: output 2
Phase 4...
testReceiveWithDebug(), line 502: cancelled

对着刚才的分析,就会发现,它们是完全一样的。说到这,也就彻底解释通了为什么在这种场景下会丢失事件的原因了。在之前泊学 App 的代码里,也有一段类似的实现:

HomeDataAPI(remoteUserSession: userSessionRepository.remoteUserSession())
  .get()
  .receive(on: RunLoop.main)
  .sink(
    receiveCompletion: {
      /// Update UI...
    },
    receiveValue: {
      /// Update UI...
    })
  .store(in: &disposables)

之前,由于获取到首页数据之后,要执行一些更新 UI 的工作,因此我把订阅的代码通过 receive(on:) 调度到了主线程,结果发现无论如何也订阅不到接收到的数据。调试这个问题花费了很多天的时间。现在,有了这些对于 Combine 订阅模型的研究,就不难理解究竟是为什么了。

妥善使用 receive(on:) 的方法

通过这些例子,我们不难相信,在当前这个版本的 Combine 里,receive(on:) 绝对是一个危险的东西。如果你想调度特定的代码执行,更好的方法应该是在订阅的部分里,明确调度你的代码,而不是依赖 receive(on:)。如果你一定要如此,那么你几乎总是应该把它搭配上 buffer 操作符一起使用:

func testBufferedReceiveOn() {
  let subject = PassthroughSubject<Int, Never>()
  let e = expectation(description: "")
  var received = [Subscribers.Event<Int, Never>]()

  let cancellable = subject
    .buffer(size: Int.max, prefetch: .byRequest, whenFull: .dropNewest)
    .receive(on: DispatchQueue(label: "test"))
    .sink(receiveCompletion: {
      received.append(.complete($0))
      e.fulfill()
    }, receiveValue: {
      received.append(.value($0))
    })

  subject.send(1)
  subject.send(completion: .finished)

  wait(for: [e], timeout: 5.0)
  XCTAssertEqual(received, [1].asEvents(completion: .finished))

  cancellable.cancel()
}

这样一来,即便订阅的代码会晚于事件生成的代码,因为有了缓存,我们还是可以订阅到所有的历史事件。但为了彻底理解这个过程,我们整整用了 11 小节的内容。估计你也和我一样会觉得从实用的角度来说不那么划算。总之,还是尽可能避免这种异步订阅的代码,自己明确去安排那些要在不同线程执行的代码更好。至少对不同的开发者来说,它们更好理解和维护。

What's next?

以上,就是由于泊学 App 开发过程中的一个 bug,而引发的我们对 Combine 原理研究的全部内容了。下一节开始,我们将回到泊学 App 的开发,开始实现内容浏览的首页。

所有订阅均支持 12 期免息分期

¥ 59

按月订阅

一个月,观看并下载所有视频内容。初来泊学,这可能是个最好的开始。

开始订阅

¥ 512

按年订阅

一年的时间,让我们一起疯狂地狩猎知识吧。比按月订阅优惠 28%

开始订阅

¥ 1280

泊学终身会员

永久观看和下载所有泊学网站视频,并赠送 100 元商店优惠券。

我要加入
如需帮助,欢迎通过以下方式联系我们