欢迎回来,基于上一节实现的 Behavior,我们完成 BufferSubject 剩余的部分,也就是 Subject 接口的实现。我们从 BufferSubject 的属性部分说起:

public class BufferSubject<Output, Failure: Error>: Subject {
  /// class Behavior ...
  typealias SubscriberRecords =
    Dictionary<CombineIdentifier, CustomSubscription<Behavior>>
  let subscribers = AtomicBox<SubscriberRecords>([:])
  let buffer: AtomicBox<Buffer<Output, Failure>>

  public init(limit: Int = 1,
    whenFull strategy: Publishers.BufferingStrategy<Failure> = .dropOldest) {
    precondition(limit >= 0)
    buffer = AtomicBox(Buffer(limit: limit, strategy: strategy))
  }
}

其中,subscribers 的定义,和 CustomSubject 是一样的。通过它的定义,我们可以进一步体会在 CustomSubscription 的实现里,把 Subscription 行为和线程安全性的代码隔离开的好处。另外一个属性 buffer 则是 BufferSubject 使用的缓冲区,对它的访问也需要是线程安全的。

接下来,是我们还不太清楚的 send(subscription:) 方法。老规矩,我们只是保持让 subscription 订阅事件:

public func send(subscription: Subscription) {
  subscription.request(.unlimited)
}

然后,是向订阅者发送普通事件的 send(_ value:) 方法。在它的实现里,我们做了两件事情。一个是把事件缓存,以便将来可以回放给后来的订阅者;另一个,是把事件发送给当前所有的订阅者:

public func send(_ value: Output) {
  buffer.mutate { b in
    b.push(value)
  }

  for (_, sub) in subscribers.value {
    _ = sub.receive(value)
  }
}

接下来,是向订阅者发送完成事件的 send(completion:) 方法。它的实现逻辑,和 send(_ value:) 几乎是一样的,只是发送完事件后,要清空 subscribers

public func send(completion: Subscribers.Completion<Failure>) {
  buffer.mutate { b in
    b.push(completion: completion)
  }

  for (_, sub) in subscribers.value {
    sub.receive(completion: completion)
  }

  subscribers.mutate { $0.removeAll() }
}

这样,BufferSubjectSubject 部分的接口就完成了。最后,我们来实现 Publisher 的部分:

public func receive<S>(subscriber: S)
  where S: Subscriber, Failure == S.Failure, Output == S.Input {
  let behavior = Behavior(
    subject: self,
    downstream: AnySubscriber(subscriber),
    buffer: buffer.value)

  let subscription = CustomSubscription(behavior: behavior)
  subscribers.mutate {
    $0[subscription.combineIdentifier] = subscription
  }

  subscription.receive(subscription: Subscriptions.empty)
}

通过它的实现,我们就能理解当订阅 BufferSubject 的时候,究竟发生了什么。首先,创建 Behavior 的时候,可以看到所有订阅者共享的都是同一个 buffer,这也就意味着,在缓冲区未满的情况下,先来的订阅者和后来的订阅者,应该可以收到同样的消息,这也就达到了之前我们期望的给新订阅者回放所有历史事件的目的。至于 receive<S>(subscriber: S) 后半部分的实现,则和 CustomSubject 是完全一样的,我们就不重复了。

至此,BufferSubject 也就全部完成了。

用单元测试验证结果

接下来,我们把测试 multicast 的测试用例改成下面这样,来测试下 BufferSubject

func testMulticastBuffer() {
  let subjectA = PassthroughSubject<Int, Never>()
  let multicastB = subjectA.scan(10, +)
    .multicast { BufferSubject(limit: Int.max) }
    .autoconnect()

  /// `sinkC`
  /// `sinkD`

  XCTAssertEqual(receivedC, [11, 13, 16, 20].asEvents(completion: .finished))
  XCTAssertEqual(receivedD, [11, 13, 16, 20].asEvents(completion: .finished))

  sinkC.cancel()
  sinkD.cancel()
}

我们只是把 multicast closure 中返回的 PassthroughSubject 替换成了 BufferSubject。订阅的过程以及事件发送的顺序,则和之前是完全一样的。现在,当 sinkD 订阅 multicastB 的时候,它就可以订阅到之前 sinkC 订阅到的所有事件了。从最后 XCTAssertEqual 判断的条件也可以看到,receivedCreceivedD 是相等的。

What's next?

说到这里,如果你回头看看之前所有使用了 multicast 方法的测试用例就不难发现,为了让 multicastB 生成事件,我们都使用了 autoconnect() 方法。如果换成手动连接,也可以写成这样:

func testMulticastLatest() {
  let subjectA = PassthroughSubject<Int, Never>()
  let multicastB = subjectA.scan(10, +)
    .multicast { CurrentValueSubject(0) }
  let cancelB = multicastB.connect()

  /// The same as before...

  cancelB.cancel()
}

当然,这里是否调用 cancel 并不影响我们的单元测试结果。但如果忽略 connect() 的返回值:

_ = multicastB.connect()

sinkCsinkD 就无法订阅到任何事件了。这也和我们预期中的行为有所差别。凭着直觉,connect 之后,multicastB 就应该开始产生了事件了。究竟是什么导致了这个现象呢?为了搞清楚这个问题,下一节,我们来聊聊和 Combine 中 Subject 生命周期相关的话题。

所有订阅均支持 12 期免息分期

¥ 59

按月订阅

一个月,观看并下载所有视频内容。初来泊学,这可能是个最好的开始。

开始订阅

¥ 512

按年订阅

一年的时间,让我们一起疯狂地狩猎知识吧。比按月订阅优惠 28%

开始订阅

¥ 1280

泊学终身会员

永久观看和下载所有泊学网站视频,并赠送 100 元商店优惠券。

我要加入
如需帮助,欢迎通过以下方式联系我们