接着上一节提出的问题,我们来实现一个叫做 BufferSubject 的类型,一方面,它发送事件的行为类似 PassthroughSubject,另一方面,它还支持在多个订阅者之间缓存事件。整体的实现套路,和之前实现 CustomSubject 基本上是一样的,分成四个部分:

  • 首先,实现表达缓存行为的类 Buffer
  • 其次,为 BufferSubject 定义它自己的 SubscriptionBehavior
  • 第三,为 BufferSubject 实现 Subject 约束的方法;
  • 第四,用单元测试验证预期的结果;

这一节,我们先来完成前两部分。

定义 Buffer

在项目目录中,新建一个 BufferSubject.swift。在这里,创建一个表达缓存行为的类型:

public struct Buffer<Output, Failure: Error> {
  var values: [Output] = []
  var completion: Subscribers.Completion<Failure>? = nil
  let limit: Int
  let strategy: Publishers.BufferingStrategy<Failure>

  public var isEmpty: Bool {
    return values.isEmpty && completion == nil
  }
}

其中:

  • values 表示缓存的普通事件数组;
  • completion 表示缓存的完成事件。由于 Buffer 只应该缓存一个完成事件,因此,我们把 valuescompletion 分开定义,而没有统一定义成一个 Event<Output, Failure> 数组;
  • limit 是缓存的上限,它应该是一个大于等于 0 的整数;
  • strategy 是 Combine 提供的缓存策略,我们一会再说;
  • isEmpty 用于判断缓存是否为空,它的依据除了 value 为空之外,还要判断 completion 是否为 nil

接下来,就是它的支持的接口了。我们先来看向缓存中添加事件的 push

public mutating func push(_ value: Output) {
  guard completion == nil else { return }
  guard values.count < limit else {
    switch strategy {
    case .dropNewest:
      values.removeLast()
      values.append(value)
    case .dropOldest:
      values.removeFirst()
      values.append(value)
    case .customError(let errFn):
      completion = .failure(errFn())
    @unknown default:
      fatalError()
    }

    return
  }

  values.append(value)
}

如果 completion 不为 nil 说明事件序列已经结束了,这时就不应该再缓存任何内容,直接返回。否则,判断下当前是否还可以缓存事件,如果可以,就直接把事件加入 values 末尾。否则,就说明缓存已经满了,这时,根据 strategy 支持的三种策略,我们要分别定义对应的行为:

  • .dropNewest 表示放弃掉队列中最新的,因此,我们就用 removeLast 去掉 values 中最后一个,再把新事件放到数组末尾;
  • .dropOldest 表示放弃掉队列中最旧的,因此,我们就用 removeFirst 去掉 values 中第一个,再把新事件放到数组末尾;
  • .customError 表示用户提供了一个当发生这种情况时生成错误的方法,我们就用这个方法封装一个 Completion<Failure> 事件,并把它作为缓冲区的完成事件;

在 Swift 5 之后,由于 BufferingStrategy<Failure> 是一个 non-frozen enum,为了更好的处理未来新增的缓存策略,我们使用了 @unknown default 处理未知的情况。它和 default 唯一的区别就是,@unknown default 在我们没有使用 case 过完所有已知的情况之前,会显示一个警告信息:

但如果只用 default 处理默认情况的话,就没有这个“福利”了。完成后,我们来看向 Buffer 中添加完成事件的方法:

public mutating func push(
  completion: Subscribers.Completion<Failure>) {
  guard self.completion == nil else { return }
  self.completion = completion
}

它的实现就简单多了,只要 Buffer 中还没有完成事件,就设置 self.completion。最后,是从 Buffer 中取出事件的方法 fetch

public mutating func fetch()
  -> Subscribers.Event<Output, Failure>? {
  if values.count > 0 {
    return .value(values.removeFirst())
  }
  else if let completion = self.completion {
    values = []
    self.completion = nil
    return .complete(completion)
  }

  return nil
}

如果缓冲区中还有事件,我们就把它包装在 Event.value 里返回。这里要注意的是,取出事件的顺序是从 values 的头元素开始的,因为向新的订阅者“回放”历史事件也要按照它们加入 Buffer 的顺序执行。

如果 values 空了,就检查 completion,如果不为 nil,就把 values 清空,completion 设置成 nil,并把之前缓存的完成事件封装成 Event.completion 返回。

最后,如果不是上述两种情况,Buffer 就没有任何内容可提供,我们就直接返回 nil。至此,这个提供事件缓存功能的 Buffer 就定义好了。

自定义 SubscriptionBehavior

接下来,我们基于 Buffer 实现 SubscriptionBehavior。继续在 BufferSubject.swift 里添加下面的代码:

public class BufferSubject<Output, Failure: Error>: Subject {
  class Behavior: SubscriptionBehavior {
    typealias Input = Output

    var upstream: Subscription? = nil
    let downstream: AnySubscriber<Input, Failure>
    let subject: BufferSubject<Output, Failure>
    var demand = Subscribers.Demand.none
    var buffer: Buffer<Output, Failure>

    init(subject: BufferSubject<Output, Failure>,
         downstream: AnySubscriber<Input, Failure>,
         buffer: Buffer<Output, Failure>){
      self.subject = subject
      self.downstream = downstream
      self.buffer = buffer
    }
  }

  /// ...
}

这里,BufferSubject 是下一节我们最终要实现的具备缓存功能的 Publisher。在 Behavior 的定义里:

  • 首先,缓存功能并不对事件值进行类型转换,因此,InputOuput 的类型仍旧是相同的;
  • 其次,在这个例子中,我们用不到 upstream,直接把它设置成 nil 就好;
  • 第三,downstream 是下游订阅者的引用;
  • 第四,subject 是这个 Behavior 服务的 Subject 对象,也就是我们将要定义的 BufferSubject
  • 第五,是可订阅的事件数量;
  • 最后,buffer 是要执行的缓存策略;

了解了它的属性之后,我们先来看 Behavior 实现的接口。首先,是 request(_ d: Subscribers.Demand)

func request(_ d: Subscribers.Demand) {
  demand += d

  while demand > 0, let next = buffer.fetch() {
    demand -= 1

    switch next {
    case .value(let value):
      let newDemand = downstream.receive(value)
      demand += newDemand
    case .complete(let completion):
      downstream.receive(completion: completion)
    }
  }
}

它的逻辑,就是只要缓冲区中还有事件,并且数量没超过订阅者发起的订阅请求数量,就不断向下游的订阅者发送缓存的事件。如果普通事件发送完了,就发送完成事件。通过它的实现,我们可以进一步理解 demand 的计算方法,它会根据每一次订阅者返回的 receive 方法的返回值进行调整。

其次,是 Behavior 自身作为订阅者,接收到普通事件的 receive 方法:

func receive(_ input: Output) -> Subscribers.Demand {
  if demand > 0 && buffer.isEmpty {
    let newDemand = downstream.receive(input)
    demand = newDemand + demand - 1
  }
  else {
    buffer.push(input)
  }

  return .unlimited
}

在它的实现里,如果,demand 大于 0,并且当前没有缓存任何事件。那我们也无需缓存收到的 input 事件,直接把它发送给下游订阅者,并更新 demand 就好了。否则,就表示我们还有历史事件没有发送完,这时,就通过 push 把事件保存进 buffer。另外,从它返回的 .unlimited 我们就知道,Behavior 会一直向上游 Publisher 订阅事件。

第三,是 Behavior 自身作为订阅者,接收到完成事件的 receive 方法。它的实现则简单一些,如果缓存为空就直接向下游订阅者发送完成事件,否则就让这个事件进缓存:

func receive(completion: Subscribers.Completion<Failure>) {
  if buffer.isEmpty {
    downstream.receive(completion: completion)
  }
  else {
    buffer.push(completion: completion)
  }
}

最后,是执行取消订阅的 cancel 方法:

func cancel() {
  subject.subscribers.mutate {
    $0.removeValue(forKey: self.combineIdentifier)
  }
}

这和之前我们实现 CustomSubject 时实现的 cancel 是一样的,通过 self.combineIdentifier,从 subscribers 中删掉申请发起 cancel 请求的订阅者就好了。

What's next?

至此,这个为 BufferSubject 服务的 Behavior 就完成了。现在,稍微休息一会儿,下一节,基于这个实现,我们完成 BufferSubject 的定义,并编写单元测试来验证它的功能。

所有订阅均支持 12 期免息分期

¥ 59

按月订阅

一个月,观看并下载所有视频内容。初来泊学,这可能是个最好的开始。

开始订阅

¥ 512

按年订阅

一年的时间,让我们一起疯狂地狩猎知识吧。比按月订阅优惠 28%

开始订阅

¥ 1280

泊学终身会员

永久观看和下载所有泊学网站视频,并赠送 100 元商店优惠券。

我要加入
如需帮助,欢迎通过以下方式联系我们