相比同步函数,异步函数有相当大的概率是会发生错误的。如果一个 task group 里的某个任务发生错误,那其它任务该怎么办呢?我们等待的执行结果还能等到么?这两个问题的答案,都和取消任务有关。
为了演示这个过程,我们把切菜的代码改成这样:
struct Dinner {
enum Accident: Error {
case knifeError
}
func chopVegetable(name: String) async throws -> Preparation {
if name == "rock" {
throw Accident.knifeError
}
print("Chopping vegetables")
return .ingredient(.vegetable(name))
}
}
withThrowingTaskGroup
当切到石头的时候,我们就抛出一个 .knifeError
的错误。现在,由于 chopVegetable
有可能发生错误了,那么整个烹饪过程也就有可能发生错误了,为此,我们需要把上一节的 makeDinnerWithTaskGroup
改成这样:
func makeDinnerWithThrowingTaskGroup() async throws -> Meal {
var foods: [Food] = []
let oven = Oven()
try await withThrowingTaskGroup(of: Preparation.self) {
group in
let dinner = Dinner()
group.async {
try await dinner.chopVegetable(name: "rock")
}
group.async {
return await dinner.marinateMeat()
}
group.async {
await oven.preheatOven()
}
while let prep = try await group.next(),
case Preparation.ingredient(let food) = prep {
foods.append(food)
}
}
return oven.cook(foods, seconds: 300)
}
主要的改动有这么几点:
- 这次,我们使用了
withThrowingTaskGroup
来创建 task group,表示其中并行执行的任务是可能出错的。而用于创建任务的group
的类型,也因此变成了ThrowingTaskGroup
; - 在收集处理后的食材时,我们要使用
try await
等待每一个异步任务的结果;
当我们像这样调用 makeDinnerWithThrowingTaskGroup
时:
@main
struct MyApp {
static func main() async {
do {
_ = try await makeDinnerWithThrowingTaskGroup()
}
catch {
print(error.localizedDescription)
}
}
}
就可能会看到这样的结果:
Marinate meat
The operation couldn’t be completed. (SE_0304.Dinner.Accident error 0.)
希望你还记得,上一节我们说过,一个 task group 里的所有任务都不能脱离 task group 的作用域而存在。现在,这些任务中,有一个因为发生错误而无法正常执行了。那这个 task group 中的其它任务该怎么处理呢?
CancellationError
默认的情况是这样的:当 chopVegetable
发生错误时,由于它是一个异步函数,这个错误并不会立即被 withThrowingTaskGroup
感知到。因此,其它的任务,也就是 marinateMeat
和 preheatOven
会继续执行。
然后,在 task group 等待 group
的结果时,我们不妨假设先等来的是 chopVegetable
的错误,由于我们没有接管这个错误,task group 仍旧会等待 marinateMeat
继续执行完,并且直接忽略它返回的结果。
也就是说,即便 task group 中有任务发生错误,它还是会确保所有的任务结束之后,才结束自己。绝大多数情况下,这都是符合我们预期的行为。但有时,如果某些异步任务中会以同步的方式执行大量的计算代码,因为其它任务的错误导致这个结果被忽略就显得有点浪费了。对我们的例子来说,就是肉也白腌了,烤箱也白预热了。为了尽可能避免这些情况,我们可以在开始任务前,先判断下当前的 task group 是否已经进入了“取消阶段”。
例如,之前负责腌肉的任务可以改成这样:
group.async {
try Task.checkCancellation()
return try await dinner.marinateMeat()
}
当 task group 中有任务取消之后,checkCancellation
会抛出一个叫做 CancellationError
的错误,这会导致腌肉任务立即结束。这就避免了“把肉腌好又被扔掉”的情况了。
或者,我们也可以像下面这样通过 isCancelled
属性检测取消状态并抛出错误:
group.async {
guard !Task.isCancelled else {
throw CancellationError()
}
return try await dinner.marinateMeat()
}
相比直接用 checkCancellation()
方法,我们可以在 guard
语句里进行一些必要的清理工作,然后再结束任务。
主动取消任务
除了由于错误导致 task group 中的任务被取消之外,我们还可以主动取消任务。为了演示这个用法,我们给 Dinner
添加一个可以连续切菜的方法:
func chop(names: [String]) async throws -> [Food] {
var veggies: [Food] = []
try await withThrowingTaskGroup(of: Preparation.self) {
group in
for name in names {
group.async {
try await chopVegetable(name: name)
}
}
while let prep = try await group.next(),
case .ingredient(let veggie) = prep {
if veggies.count >= 3 {
group.cancelAll()
break
}
else {
veggies.append(veggie)
}
}
}
return veggies
}
它接受一个 [String]
参数,表示要切的所有蔬菜。在它的实现里,我们遍历 names
,对其中的每个元素,创建一个并行的切菜任务。
获取结果的部分,每读到一个结果,我们就把它保存到 veggies
里。当 task group 中没有任务时,它会返回 nil
。
并且,在处理任务结果时,我们还做了一个假设:当 vaggies
超过了三种蔬菜时,就使用 group.cancelAll()
取消掉 task group 中的所有任务。一旦 task group 被取消,无论是当前还没执行完的,还是调用 cancelAll()
之后又创建的新任务,它们的结果都会被 task group 忽略。这样,无论 names
中包含多少种蔬菜,veggies
中最多都只可能有三种。例如:
let veggies = try await Dinner().chop(
names: ["tomato", "cucumber", "celery", "cabbage"])
veggies
的值就是 [.vegetable("tomato"), .vegetable("cucumber"), .vegetable("celery")]
。即便我们把收集结果的代码改成下面这样:
while let veggie = try await group.next() {
if vaggies.count >= 3 {
group.cancelAll()
group.async {
return .vegetable("cabbage")
}
break
}
else {
vaggies.append(veggie)
}
}
结果仍旧是相同的,调用 cancelAll
方法后,新创建任务的返回值,也会被直接忽略。
What's next?
至此,关于结构化并发任务核心概念和基础用法,我们就讲的就差不多了。从下一节开始,我们聊聊有关非结构化任务的内容。