接着上一节的话题,为了让事件的生成和订阅发生在不同的线程里,我们先在测试用例中使用 receive(on:)
,把订阅代码“调度”走:
func testReceiveOnImmediate() {
let e = expectation(description: "")
let subject = PassthroughSubject<Int, Never>()
var received = [Subscribers.Event<Int, Never>]()
let sink = subject.receive(on: ImmediateScheduler.shared)
.sink(receiveCompletion: {
received.append(.complete($0))
e.fulfill()
}, receiveValue: {
received.append(.value($0))
})
subject.send(1)
subject.send(completion: .finished)
wait(for: [e], timeout: 5.0)
XCTAssertEqual(received, [1].asEvents(completion: .finished))
sink.cancel()
}
现在,事件的发送在测试线程里(也就是主线程),事件的接收在 ImmediateScheduler
指定的线程里(在这个例子里,其实仍旧是主线程,我们只是从一个最简单的调度开始,稍后就会看到真正调度到其它线程的情况)。为了观察事件的订阅,我们使用了 expectation
,并在订阅到完成事件时,把它设置成 fulfill
状态。最后,使用 wait
等待 expectation
被满足。这样一来,received
中应该可以收集到来自主线程的事件 1 和 .finsihed
。
当然,这个测试用例更多地是在从想法上表达如何测试事件发送和订阅在不同线程中的情况。实际上,testReceiveOnImmediate
中没有任何异步执行的环节,整个过程的执行,仍旧是同步的。
一个会丢事件的订阅场景
接下来,我们把订阅代码真正调度到一个后台线程,来继续观察 received
的值:
func testReceiveOnFailure() {
let queue = DispatchQueue(label: "test")
let e = expectation(description: "")
let subject = PassthroughSubject<Int, Never>()
var received = [Subscribers.Event<Int, Never>]()
print(Thread.current)
let sink = subject.receive(on: queue)
.sink(receiveCompletion: {
print(Thread.current)
received.append(.complete($0))
e.fulfill()
}, receiveValue: {
received.append(.value($0))
})
subject.send(1)
subject.send(completion: .finished)
wait(for: [e], timeout: 5.0)
// The following assert will FAIL!
XCTAssertEqual(received, [1].asEvents(completion: .finished))
sink.cancel()
}
这里,唯一的变化,就是我们把创建订阅者的代码调度到了 queue
。但让人诧异的是,上面这个测试用例不仅无法通过测试,如果你尝试多执行几次,就连 received
中的值都可能发生变化。它有时可以收集到 .finished
事件,有时又是一个空数组。
如果我们把发送 .finished
事件的代码也调度到 queue
:
queue.async { subject.send(completion: .finished) }
received
中就可以稳定地收到 .finsihed
事件,但无论如何,它始终都无法接收到 subject
发送的事件 1。这就再一次印证了我们之前说过的一个现象:订阅了一个 Publisher
之后,不一定可以收到它的所有事件。事件有可能会被漏掉,在绝大多数情况下,这可绝对不是我们期望的结果。
为什么会如此呢?
一个便于观察 Publisher 的方法
根本的原因,就是当事件采用异步接收的模式时,demand
也会通过调度一个专门的 request(_ demand:)
方法来传递。而执行 subject.send(1)
的时候,传递 demand
的方法还没被调度执行,因此事件 1 就丢了。为了更好地观察这个现象,我们先创建一个辅助方法 debug
:
public extension Publisher {
func debug(
prefix: String = "",
function: String = #function,
line: Int = #line) -> Publishers.HandleEvents<Self> {
let pattern =
"\(prefix + (prefix.isEmpty ? "" : " "))\(function), line \(line)"
return handleEvents(receiveSubscription: {
Swift.print("\(pattern)subscription \($0)")
}, receiveOutput: {
Swift.print("\(pattern)output \($0)")
}, receiveCompletion: {
Swift.print("\(pattern)completion \($0)")
}, receiveCancel: {
Swift.print("\(pattern)cancelled")
}, receiveRequest: {
Swift.print("\(pattern)request \($0)")
})
}
}
它的作用,就是在 Publisher
发生各种事件的时候,在控制台打印一段文字。其中:
prefix
是打印信息的前缀,如果不为空,就在prefix
后面追加一个空格便于识别;function
和line
,它们使用了 Swift 内置的宏#function
和#line
,分别表示了调用debug
的函数名称和所在行数;
最后,debug
返回了一个 HandleEvents
对象,它允许我们为 Publisher
可能发生的各种事件,注册一个 closure。这个过程是通过 handleEvents
方法完成的,它的 5 个参数,分别表示 Publisher
收到来自上游的 Subscription
对象、发生普通事件、发生完成事件、取消订阅以及收到更新 demand
请求,这 5 种不同的事件。当这些事件发生的时候,我们在控制台打印了对应的消息。
重新观察之前的丢消息场景
有了 debug
之后,我们改造下之前的测试用例,给 subject
挂上 debug
方法,重新观察下丢消息的场景:
func testReceiveWithDebug() {
let subject = PassthroughSubject<Int, Never>()
var received = [Subscribers.Event<Int, Never>]()
print("Start...")
let cancellable = subject
.debug()
.receive(on: DispatchQueue.main)
.sink(receiveValue: { received.append(.value($0)) })
print("Phase 1...")
subject.send(1)
XCTAssertEqual(received, [].asEvents(completion: nil))
print("Phase 2...")
RunLoop.current.run(until: Date(timeIntervalSinceNow: 0.001))
XCTAssertEqual(received, [].asEvents(completion: nil))
print("Phase 3...")
subject.send(2)
XCTAssertEqual(received, [].asEvents(completion: nil))
print("Phase 4...")
RunLoop.current.run(until: Date(timeIntervalSinceNow: 0.001))
XCTAssertEqual(received, [2].asEvents(completion: nil))
cancellable.cancel()
}
这个观察的过程,分成 4 个阶段:
- 首先,
subject
生成事件 1 的时候,和之前一样,这个事件会丢掉,received
中仍旧是空的; - 其次,我们启动了主线程中的
RunLoop
,这时,被调度的订阅代码才会得以执行,就像之前说过的,一个专门的request(_ demand:)
会被执行,向subject
发送.unlimited
订阅请求。但此时,received
仍旧是空的; - 第三,
subject
继续生成了事件 2,此时,receiveValue
指定的 closure 会被放入RunLoop
等待调度; - 第四,只要重新让
RunLoop
执行一下,就可以订阅到第三步的事件了; - 最后,
cancel
方法会取消subject
,debug
应该也可以捕获到这个事件;
有了这番分析之后,我们只要执行这个测试用例,就会在控制台看到类似这样的结果:
Phase 1...
Phase 2...
testReceiveWithDebug(), line 502: request unlimited
Phase 3...
testReceiveWithDebug(), line 502: output 2
Phase 4...
testReceiveWithDebug(), line 502: cancelled
对着刚才的分析,就会发现,它们是完全一样的。说到这,也就彻底解释通了为什么在这种场景下会丢失事件的原因了。在之前泊学 App 的代码里,也有一段类似的实现:
HomeDataAPI(remoteUserSession: userSessionRepository.remoteUserSession())
.get()
.receive(on: RunLoop.main)
.sink(
receiveCompletion: {
/// Update UI...
},
receiveValue: {
/// Update UI...
})
.store(in: &disposables)
之前,由于获取到首页数据之后,要执行一些更新 UI 的工作,因此我把订阅的代码通过 receive(on:)
调度到了主线程,结果发现无论如何也订阅不到接收到的数据。调试这个问题花费了很多天的时间。现在,有了这些对于 Combine 订阅模型的研究,就不难理解究竟是为什么了。
妥善使用 receive(on:) 的方法
通过这些例子,我们不难相信,在当前这个版本的 Combine 里,receive(on:)
绝对是一个危险的东西。如果你想调度特定的代码执行,更好的方法应该是在订阅的部分里,明确调度你的代码,而不是依赖 receive(on:)
。如果你一定要如此,那么你几乎总是应该把它搭配上 buffer
操作符一起使用:
func testBufferedReceiveOn() {
let subject = PassthroughSubject<Int, Never>()
let e = expectation(description: "")
var received = [Subscribers.Event<Int, Never>]()
let cancellable = subject
.buffer(size: Int.max, prefetch: .byRequest, whenFull: .dropNewest)
.receive(on: DispatchQueue(label: "test"))
.sink(receiveCompletion: {
received.append(.complete($0))
e.fulfill()
}, receiveValue: {
received.append(.value($0))
})
subject.send(1)
subject.send(completion: .finished)
wait(for: [e], timeout: 5.0)
XCTAssertEqual(received, [1].asEvents(completion: .finished))
cancellable.cancel()
}
这样一来,即便订阅的代码会晚于事件生成的代码,因为有了缓存,我们还是可以订阅到所有的历史事件。但为了彻底理解这个过程,我们整整用了 11 小节的内容。估计你也和我一样会觉得从实用的角度来说不那么划算。总之,还是尽可能避免这种异步订阅的代码,自己明确去安排那些要在不同线程执行的代码更好。至少对不同的开发者来说,它们更好理解和维护。
What's next?
以上,就是由于泊学 App 开发过程中的一个 bug,而引发的我们对 Combine 原理研究的全部内容了。下一节开始,我们将回到泊学 App 的开发,开始实现内容浏览的首页。