



let cache = Cache()
let operation = OperationStatus()

func execute() async {
    if await operation.isExecuting else {
        await operation.waitUntilFinished()
    } else {
        await operation.set(isExecuting: true)

    if let data = await cache.data {
        return data

    let request = myRequest()
    let response = await myService.send(request)
    await cache.set(data: response)

    await operation.set(isExecuting: false)

actor Cache {
    var data: myResponse?

    func set(data: myResponse?) {
        self.data = data

actor OperationStatus {
    @Published var isExecuting = false
    private var cancellable = Set<AnyCancellable>()

    func set(isExecuting: Bool) {
        self.isExecuting = isExecuting

    func waitUntilFinished() async {
        guard isExecuting else { return }

        return await withCheckedContinuation { continuation in
                .first { !$0 } // Wait until execution toggled off
                .sink { _ in continuation.resume() }
                .store(in: &cancellable)

// Do something
DispatchQueue.concurrentPerform(iterations: 1_000_000) { _ in execute() }


final class OperationStatusTests: XCTestCase {
    private let iterations = 10_000 // 1_000_000
    private let outerIterations = 10

    actor Storage {
        var counter: Int = 0

        func increment() {
            counter += 1

    func testConcurrency() {
        // Given
        let storage = Storage()
        let operation = OperationStatus()
        let promise = expectation(description: "testConcurrency")
        promise.expectedFulfillmentCount = outerIterations * iterations

        @Sendable func execute() async {
            guard await !operation.isExecuting else {
                await operation.waitUntilFinished()

            await operation.set(isExecuting: true)
            try? await Task.sleep(seconds: 8)
            await storage.increment()
            await operation.set(isExecuting: false)

        waitForExpectations(timeout: 10)

        // When
        DispatchQueue.concurrentPerform(iterations: outerIterations) { _ in
            (0..<iterations).forEach { _ in
                Task { await execute() }

        // Then
        // XCTAssertEqual... how to test?


func entireProcess() async throws {
    let value = try await first()

    let value2 = try await subsequent(with: value)
    let value3 = try await subsequent(with: value2)
    let value4 = try await subsequent(with: value3)

    // do something with `value4`


func entireProcess() async throws {
    var value = try await first()

    for _ in 0 ..< 4 {
        value = try await subsequent(with: value)

    // do something with `value`

这是声明一系列的最简单的方法async函数,每个函数都将先前的结果作为下一次迭代的输入。因此,让我们扩展上述内容,包括 Instruments 的“兴趣点”工具的一些路标:

import os.log

private let log = OSLog(subsystem: "Test", category: .pointsOfInterest)

func entireProcess() async throws {
    let id = OSSignpostID(log: log)
    os_signpost(.begin, log: log, name: #function, signpostID: id, "start")

    var value = try await first()

    for _ in 0 ..< 4 {
        os_signpost(.event, log: log, name: #function, "Scheduling: %d with input of %d", i, value)
        value = try await subsequent(with: value)

    os_signpost(.end, log: log, name: #function, signpostID: id, "%d", value)

func first() async throws -> Int {
    let id = OSSignpostID(log: log)
    os_signpost(.begin, log: log, name: #function, signpostID: id, "start")

    try await Task.sleep(seconds: 1)

    let value = 42
    os_signpost(.end, log: log, name: #function, signpostID: id, "%d", value)

    return value

func subsequent(with value: Int) async throws -> Int {
    let id = OSSignpostID(log: log)
    os_signpost(.begin, log: log, name: #function, signpostID: id, "%d", value)

    try await Task.sleep(seconds: 1)

    let newValue = value + 1
    defer { os_signpost(.end, log: log, name: #function, signpostID: id, "%d", newValue) }

    return newValue

因此,您会看到一系列请求,它们将其结果传递给后续请求。所有的os_signpost路标的内容是这样我们可以直观地看到它们在 Instrument 的“兴趣点”工具中按顺序运行:





actor SerialTasks<Success> {
    private var previousTask: Task<Success, Error>?

    func add(block: @Sendable @escaping () async throws -> Success) {
        previousTask = Task { [previousTask] in
            let _ = await previousTask?.result
            return try await block()



  1. The add方法本身必须not是一个异步函数。我们需要避开演员可重入性 https://github.com/apple/swift-evolution/blob/main/proposals/0306-actors.md#actor-reentrancy。如果这是一个async函数(如您的示例中),我们将失去任务的顺序执行。

  2. The Task has a [previousTask]捕获列表捕获先前任务的副本。这样,每个任务都会await前一个,避免任何比赛。


我建议您使用 MCVE 发布一个单独的问题,并提供一个实际示例,准确说明您想要从一个异步函数传递到另一个异步函数的内容。例如,我已经完成了上述的排列,将整数从一个任务传递到另一个任务。但在实践中,这并没有多大用处,因为当您开始处理异构结果解析的现实时,它会变得更加复杂。在实践中,我开始这个问题的简单例子是最实用的模式。

关于处理/围绕演员重入的更广泛问题,我建议密切关注SE-0306 - 未来方向 https://github.com/apple/swift-evolution/blob/main/proposals/0306-actors.md#future-directions其中明确考虑了一些潜在的优雅的即将到来的替代方案。如果看到一些改进,无论是在语言本身还是在Swift 异步算法 https://github.com/apple/swift-async-algorithms图书馆。



  • 尝试使用OperationStatus强制顺序执行async通话无法进行,因为演员有可重入性 https://github.com/apple/swift-evolution/blob/main/proposals/0306-actors.md#actor-reentrancy。如果你有一个async功能,每次你点击一个await,这是一个暂停点,在此点允许对该异步函数的另一个调用继续进行。您的诚信度OperationStatus逻辑就会被违反。您不会遇到串行行为。

    如果您对悬挂点感兴趣,我建议您观看 WWDC 2021 视频Swift 并发:幕后花絮 https://developer.apple.com/videos/play/wwdc2021/10254/.

  • The testConcurrency正在打电话waitForExpectations在它实际开始任何能够满足任何期望的任务之前。那总是会超时。

  • The testConcurrency正在使用GCDconcurrentPerform,反过来,它只是调度一个异步任务并立即返回。这违背了整个目的concurrentPerform(这是一种运行一系列的节流机制同步并行任务,但不能超过 CPU 上的最大内核数)。此外,Swift 并发具有自己的模拟功能concurrentPerform,即受约束的“协作线程池”(也在该讨论中讨论过)video https://developer.apple.com/videos/play/wwdc2021/10254/,IIRC),渲染concurrentPerform在 Swift 并发领域已经过时了。

    最重要的是,包含在内没有意义concurrentPerform在 Swift 并发代码库中。使用它也没有意义concurrentPerform启动异步任务(无论是 Swift 并发还是 GCD)。它用于并行启动一系列同步任务。

  • In execute在您的测试中,您有两种执行路径,一种将等待某种状态更改并满足期望,而无需增加存储。这意味着您将失去一些增加该值的尝试。您的总计将与所需的结果值不匹配。现在,如果您的目的是在另一个请求待处理时放弃请求,那没问题。但我不认为那是你的意图。

  • 回答你关于最后如何测试成功的问题。你可能会这样做:

    actor Storage {
        private var counter: Int = 0
        func increment() {
            counter += 1
        var value: Int { counter }
    func testConcurrency() async {
        let storage = Storage()
        let operation = OperationStatus()
        let promise = expectation(description: "testConcurrency")
        let finalCount = outerIterations * iterations
        promise.expectedFulfillmentCount = finalCount
        @Sendable func execute() async {
            guard await !operation.isExecuting else {
                await operation.waitUntilFinished()
            await operation.set(isExecuting: true)
            try? await Task.sleep(seconds: 1)
            await storage.increment()
            await operation.set(isExecuting: false)
        // waitForExpectations(timeout: 10)                                      // this is not where you want to wait; moved below, after the tasks started
        // DispatchQueue.concurrentPerform(iterations: outerIterations) { _ in   // no point in this
        for _ in 0 ..< outerIterations {
            for _ in 0 ..< iterations {
                Task { await execute() }
        await waitForExpectations(timeout: 10)
        // test the success to see if the store value was correct
        let value = await storage.value                                          // to test that you got the right count, fetch the value; note `await`, thus we need to make this an `async` test
        // Then
        XCTAssertEqual(finalCount, value, "Count")


  • 如果在您经历此过程时,您想真正确认您的行为OperationStatus模式,我建议使用os_signpost间隔(或任务开始和结束的简单日志记录语句)。您将看到异步的单独调用execute方法不是按顺序运行的。


