理解了 Combine 的事件发布订阅模型之后,这一节我们对 CustomSubject
的实现做一些改进,让它支持在多线程环境中使用。
AtomicBox
为了给要访问的对象添加访问保护,在 Swift 中一个常见的做法就是创建一个“箱子”类型,并把要保护的内容作为箱子里的“内容”,这样做的一个好处就是保护逻辑和内容的访问是分开的。为此,在我们的项目中,创建一个 AtomicBox.swift,在其中添加一个 AtomicBox
类型:
public final class AtomicBox<T> {
@usableFromInline var mutex = os_unfair_lock()
@usableFromInline var unboxed: T
public init(_ unboxed: T) {
self.unboxed = unboxed
}
}
其中:
mutext
是AtomicBox
内部使用的用于线程同步的 mutex 对象;unboxed
是AtomicBox
中的“内容”,也就是我们要保护的对象;
这里,之所以要把它们用 @usableFromInline
修饰,是为了让它们成为这个模块二进制接口的一部分。为了性能方面的考虑,AtomicBox
的一部分公开接口会用内联的方式实现,为了在这些接口中使用这些属性,我们应该使用 @usableFromInline
修饰它们。
接下来,是 AtomicBox
中的读写接口。读取数据是通过 value
属性提供的。上锁成功后,返回 unboxed
就好了:
@inlinable public var value: T {
get {
os_unfair_lock_lock(&mutex)
defer { os_unfair_lock_unlock(&mutex) }
return unboxed
}
}
修改 unboxed
的操作,是通过 mutate
方法提供的:
@discardableResult @inlinable
public func mutate<U>(_ fn: (inout T) throws -> U) rethrows -> U {
os_unfair_lock_lock(&mutex)
defer { os_unfair_lock_unlock(&mutex) }
return try fn(&unboxed)
}
这里,我们实现的逻辑并不是直接对 unboxed
赋值。而是把修改 unboxed
的过程封装成了一个函数 fn
。其中,修改 unboxed
的部分用 inout T
表示。这个方法还可以抛出错误,并且返回一个和 T
不同的类型。
当然,错误也好,不同的返回值类型也好,这些都是修改 unboxed
这个动作的某种副作用,并不是指一定会用到它们。因此,我们用 @discardableResult
修饰了 mutate
的返回值,这样,编译器就不会产生未使用返回值的警告了。
最后一个要实现的接口,是判断 AtomicBox
保护的内容是否正在被修改,我们用 os_unfair_lock_trylock
方法尝试上锁,如果锁上了,就表示 unboxed
没人在用,否则,就表示 unboxed
正在被修改:
/// A computed property that has conditional statement should not be
/// marked as `@inlinable`
public var isMutating: Bool {
if os_unfair_lock_trylock(&mutex) {
os_unfair_lock_unlock(&mutex)
return false
}
return true
}
要注意的是,我们并没有使用 @inlinable
修饰 isMutating
,这是因为它的实现里包含了 if
。通常,我们不对这种带有分之语句的方法使用内联。这样,一个简单的同步读写类就完成了。
CustomSuject 的线程安全性
接下来,我们用它保护 CustomSubject
中的资源,也就是它的 subscribers
属性。为了尽可能减少对这个属性访问的修改,我们可以先基于 AtomicBox
创建一个 property wrapper。
为此,继续在 AtomicBox.swift 里,添加下面的代码:
@propertyWrapper
public class Atomic<T> {
var boxed: AtomicBox<T>
init(content: T) { self.boxed = AtomicBox<T>(content) }
public var wrappedValue: T {
get {
return boxed.value
}
}
public var projectedValue: AtomicBox<T> {
get {
return boxed
}
}
}
其实,就是 AtomicBox
的一层包装,很简单,我们就不多说了。然后,我们就可以用它修饰 CustomSubject
中 subscribers
的定义了:
@Atomic<SubscriberRecords>(content: [:]) var subscribers
这样,所有读取 subscribers
的代码都无需变动,只需要处理一下修改 subscribers
的代码就好了。例如,要把 cancel
改成这样:
func cancel() {
subject.$subscribers.mutate {
$0.removeValue(forKey: self.combineIdentifier)
}
}
也就是说,所有之前直接修改 subscribers
的地方,现在要通过 $subscriber
先得到内部的 AtomicBox
对象,再把修改的逻辑放到 mutate
方法的 closure 里。受到影响的代码,还有 send(completion:)
和 receive<S>(subscriber:)
:
public func send(completion: Subscribers.Completion<Failure>) {
for (_, sub) in subscribers {
_ = sub.receive(completion: completion)
}
$subscribers.mutate { $0.removeAll() }
}
// Publisher
public func receive<S>(subscriber: S)
where S : Subscriber, Failure == S.Failure, Output == S.Input {
let behavior = Behavior(
subject: self, downstream: AnySubscriber(subscriber))
let subscription = CustomSubscription(behavior: behavior)
$subscribers.mutate {
$0[subscription.combineIdentifier] = subscription
}
subscription.receive(subscription: Subscriptions.empty)
}
What's next?
现在,CustomSubject
就可以在多线程环境中安全的使用了。下一节,我们继续讨论之前留下的话题:如何在 Combine 中实现共享订阅的话题。