理解了 Combine 的事件发布订阅模型之后,这一节我们对 CustomSubject 的实现做一些改进,让它支持在多线程环境中使用。

AtomicBox

为了给要访问的对象添加访问保护,在 Swift 中一个常见的做法就是创建一个“箱子”类型,并把要保护的内容作为箱子里的“内容”,这样做的一个好处就是保护逻辑和内容的访问是分开的。为此,在我们的项目中,创建一个 AtomicBox.swift,在其中添加一个 AtomicBox 类型:

public final class AtomicBox<T> {
  @usableFromInline var mutex = os_unfair_lock()
  @usableFromInline var unboxed: T

  public init(_ unboxed: T) {
    self.unboxed = unboxed
  }
}

其中:

  • mutextAtomicBox 内部使用的用于线程同步的 mutex 对象;
  • unboxedAtomicBox 中的“内容”,也就是我们要保护的对象;

这里,之所以要把它们用 @usableFromInline 修饰,是为了让它们成为这个模块二进制接口的一部分。为了性能方面的考虑,AtomicBox 的一部分公开接口会用内联的方式实现,为了在这些接口中使用这些属性,我们应该使用 @usableFromInline 修饰它们。

接下来,是 AtomicBox 中的读写接口。读取数据是通过 value 属性提供的。上锁成功后,返回 unboxed 就好了:

@inlinable public var value: T {
  get {
    os_unfair_lock_lock(&mutex)
    defer { os_unfair_lock_unlock(&mutex) }

    return unboxed
  }
}

修改 unboxed 的操作,是通过 mutate 方法提供的:

@discardableResult @inlinable
public func mutate<U>(_ fn: (inout T) throws -> U) rethrows -> U {
  os_unfair_lock_lock(&mutex)
  defer { os_unfair_lock_unlock(&mutex) }

  return try fn(&unboxed)
}

这里,我们实现的逻辑并不是直接对 unboxed 赋值。而是把修改 unboxed 的过程封装成了一个函数 fn。其中,修改 unboxed 的部分用 inout T 表示。这个方法还可以抛出错误,并且返回一个和 T 不同的类型。

当然,错误也好,不同的返回值类型也好,这些都是修改 unboxed 这个动作的某种副作用,并不是指一定会用到它们。因此,我们用 @discardableResult 修饰了 mutate 的返回值,这样,编译器就不会产生未使用返回值的警告了。

最后一个要实现的接口,是判断 AtomicBox 保护的内容是否正在被修改,我们用 os_unfair_lock_trylock 方法尝试上锁,如果锁上了,就表示 unboxed 没人在用,否则,就表示 unboxed 正在被修改:

/// A computed property that has conditional statement should not be
/// marked as `@inlinable`
public var isMutating: Bool {
  if os_unfair_lock_trylock(&mutex) {
    os_unfair_lock_unlock(&mutex)
    return false
  }

  return true
}

要注意的是,我们并没有使用 @inlinable修饰 isMutating,这是因为它的实现里包含了 if。通常,我们不对这种带有分之语句的方法使用内联。这样,一个简单的同步读写类就完成了。

CustomSuject 的线程安全性

接下来,我们用它保护 CustomSubject 中的资源,也就是它的 subscribers 属性。为了尽可能减少对这个属性访问的修改,我们可以先基于 AtomicBox 创建一个 property wrapper。

为此,继续在 AtomicBox.swift 里,添加下面的代码:

@propertyWrapper
public class Atomic<T> {
  var boxed: AtomicBox<T>
  init(content: T) { self.boxed = AtomicBox<T>(content) }

  public var wrappedValue: T {
    get {
      return boxed.value
    }
  }

  public var projectedValue: AtomicBox<T> {
    get {
      return boxed
    }
  }
}

其实,就是 AtomicBox 的一层包装,很简单,我们就不多说了。然后,我们就可以用它修饰 CustomSubjectsubscribers 的定义了:

@Atomic<SubscriberRecords>(content: [:]) var subscribers

这样,所有读取 subscribers 的代码都无需变动,只需要处理一下修改 subscribers 的代码就好了。例如,要把 cancel 改成这样:

func cancel() {
  subject.$subscribers.mutate {
    $0.removeValue(forKey: self.combineIdentifier)
  }
}

也就是说,所有之前直接修改 subscribers 的地方,现在要通过 $subscriber 先得到内部的 AtomicBox 对象,再把修改的逻辑放到 mutate 方法的 closure 里。受到影响的代码,还有 send(completion:)receive<S>(subscriber:)

public func send(completion: Subscribers.Completion<Failure>) {
  for (_, sub) in subscribers {
    _ = sub.receive(completion: completion)
  }

  $subscribers.mutate { $0.removeAll() }
}

// Publisher
public func receive<S>(subscriber: S)
  where S : Subscriber, Failure == S.Failure, Output == S.Input {
  let behavior = Behavior(
    subject: self, downstream: AnySubscriber(subscriber))
  let subscription = CustomSubscription(behavior: behavior)

  $subscribers.mutate {
    $0[subscription.combineIdentifier] = subscription
  }

  subscription.receive(subscription: Subscriptions.empty)
}

What's next?

现在,CustomSubject 就可以在多线程环境中安全的使用了。下一节,我们继续讨论之前留下的话题:如何在 Combine 中实现共享订阅的话题。

所有订阅均支持 12 期免息分期

¥ 59

按月订阅

一个月,观看并下载所有视频内容。初来泊学,这可能是个最好的开始。

开始订阅

¥ 512

按年订阅

一年的时间,让我们一起疯狂地狩猎知识吧。比按月订阅优惠 28%

开始订阅

¥ 1280

泊学终身会员

永久观看和下载所有泊学网站视频,并赠送 100 元商店优惠券。

我要加入
如需帮助,欢迎通过以下方式联系我们