如何暂停后续任务直到第一次完成然后与等待的任务共享其响应?

2023-12-27

我有一个actor它以第一个请求暂停后续请求直到完成的方式限制请求,然后与它们共享其响应,这样它们就不必发出相同的请求。

这就是我想做的:

let cache = Cache()
let operation = OperationStatus()

func execute() async {
    if await operation.isExecuting else {
        await operation.waitUntilFinished()
    } else {
        await operation.set(isExecuting: true)
    }

    if let data = await cache.data {
        return data
    }

    let request = myRequest()
    let response = await myService.send(request)
    await cache.set(data: response)

    await operation.set(isExecuting: false)
}

actor Cache {
    var data: myResponse?

    func set(data: myResponse?) {
        self.data = data
    }
}

actor OperationStatus {
    @Published var isExecuting = false
    private var cancellable = Set<AnyCancellable>()

    func set(isExecuting: Bool) {
        self.isExecuting = isExecuting
    }

    func waitUntilFinished() async {
        guard isExecuting else { return }

        return await withCheckedContinuation { continuation in
            $isExecuting
                .first { !$0 } // Wait until execution toggled off
                .sink { _ in continuation.resume() }
                .store(in: &cancellable)
        }
    }
}

// Do something
DispatchQueue.concurrentPerform(iterations: 1_000_000) { _ in execute() }

这确保一次一个请求,后续调用将等待直到完成。看起来这可行,但想知道是否有一种纯粹的并发方式而不是混合Combine,我该如何测试这个?这是我开始的测试,但我很困惑如何测试它:

final class OperationStatusTests: XCTestCase {
    private let iterations = 10_000 // 1_000_000
    private let outerIterations = 10

    actor Storage {
        var counter: Int = 0

        func increment() {
            counter += 1
        }
    }

    func testConcurrency() {
        // Given
        let storage = Storage()
        let operation = OperationStatus()
        let promise = expectation(description: "testConcurrency")
        promise.expectedFulfillmentCount = outerIterations * iterations

        @Sendable func execute() async {
            guard await !operation.isExecuting else {
                await operation.waitUntilFinished()
                promise.fulfill()
                return
            }

            await operation.set(isExecuting: true)
            try? await Task.sleep(seconds: 8)
            await storage.increment()
            await operation.set(isExecuting: false)
            promise.fulfill()
        }

        waitForExpectations(timeout: 10)

        // When
        DispatchQueue.concurrentPerform(iterations: outerIterations) { _ in
            (0..<iterations).forEach { _ in
                Task { await execute() }
            }
        }

        // Then
        // XCTAssertEqual... how to test?
    }
}

在处理更一般的示例之前,让我们首先放弃一些异步任务顺序执行的自然示例,将一个任务的结果作为下一个任务的参数传递。考虑:

func entireProcess() async throws {
    let value = try await first()

    let value2 = try await subsequent(with: value)
    let value3 = try await subsequent(with: value2)
    let value4 = try await subsequent(with: value3)

    // do something with `value4`
}

Or

func entireProcess() async throws {
    var value = try await first()

    for _ in 0 ..< 4 {
        value = try await subsequent(with: value)
    }

    // do something with `value`
}

这是声明一系列的最简单的方法async函数,每个函数都将先前的结果作为下一次迭代的输入。因此,让我们扩展上述内容,包括 Instruments 的“兴趣点”工具的一些路标:

import os.log

private let log = OSLog(subsystem: "Test", category: .pointsOfInterest)

func entireProcess() async throws {
    let id = OSSignpostID(log: log)
    os_signpost(.begin, log: log, name: #function, signpostID: id, "start")

    var value = try await first()

    for _ in 0 ..< 4 {
        os_signpost(.event, log: log, name: #function, "Scheduling: %d with input of %d", i, value)
        value = try await subsequent(with: value)
    }

    os_signpost(.end, log: log, name: #function, signpostID: id, "%d", value)
}

func first() async throws -> Int {
    let id = OSSignpostID(log: log)
    os_signpost(.begin, log: log, name: #function, signpostID: id, "start")

    try await Task.sleep(seconds: 1)

    let value = 42
    os_signpost(.end, log: log, name: #function, signpostID: id, "%d", value)

    return value
}

func subsequent(with value: Int) async throws -> Int {
    let id = OSSignpostID(log: log)
    os_signpost(.begin, log: log, name: #function, signpostID: id, "%d", value)

    try await Task.sleep(seconds: 1)

    let newValue = value + 1
    defer { os_signpost(.end, log: log, name: #function, signpostID: id, "%d", newValue) }

    return newValue
}

因此,您会看到一系列请求,它们将其结果传递给后续请求。所有的os_signpost路标的内容是这样我们可以直观地看到它们在 Instrument 的“兴趣点”工具中按顺序运行:

你可以看到计划每个任务时的事件路标,并且间隔说明了这些异步任务的顺序执行。

这是在任务之间建立依赖关系的最简单方法,将值从一个任务传递到另一个任务。


现在,问题是如何概括上述内容,即我们在开始下一个任务之前等待上一个任务。

一种模式是编写一个等待前一个演员结果的演员。考虑:

actor SerialTasks<Success> {
    private var previousTask: Task<Success, Error>?

    func add(block: @Sendable @escaping () async throws -> Success) {
        previousTask = Task { [previousTask] in
            let _ = await previousTask?.result
            return try await block()
        }
    }
}

与前面的示例不同,这不需要您有一个函数来启动后续任务。例如,当某些单独的用户交互要求我将新任务添加到先前提交的任务列表的末尾时,我使用了上述方法。

上述演员有两个微妙但关键的方面:

  1. The add方法本身必须not是一个异步函数。我们需要避开演员可重入性 https://github.com/apple/swift-evolution/blob/main/proposals/0306-actors.md#actor-reentrancy。如果这是一个async函数(如您的示例中),我们将失去任务的顺序执行。

  2. The Task has a [previousTask]捕获列表捕获先前任务的副本。这样,每个任务都会await前一个,避免任何比赛。

上述可用于使一系列任务按顺序运行。但它本身并不是在任务之间传递值。我承认我已经使用了这种模式,我只需要顺序执行很大程度上独立的任务(例如,发送单独的命令发送给某些Process)。但它可能适合您的场景,在该场景中您希望“与[后续请求]共享其响应”。

我建议您使用 MCVE 发布一个单独的问题,并提供一个实际示例,准确说明您想要从一个异步函数传递到另一个异步函数的内容。例如,我已经完成了上述的排列,将整数从一个任务传递到另一个任务。但在实践中,这并没有多大用处,因为当您开始处理异构结果解析的现实时,它会变得更加复杂。在实践中,我开始这个问题的简单例子是最实用的模式。

关于处理/围绕演员重入的更广泛问题,我建议密切关注SE-0306 - 未来方向 https://github.com/apple/swift-evolution/blob/main/proposals/0306-actors.md#future-directions其中明确考虑了一些潜在的优雅的即将到来的替代方案。如果看到一些改进,无论是在语言本身还是在Swift 异步算法 https://github.com/apple/swift-async-algorithms图书馆。


tl;dr

我不想在上面讨论有关您的代码片段的内容,但存在很多问题。所以,如果你原谅我,这里有一些观察结果:

  • 尝试使用OperationStatus强制顺序执行async通话无法进行,因为演员有可重入性 https://github.com/apple/swift-evolution/blob/main/proposals/0306-actors.md#actor-reentrancy。如果你有一个async功能,每次你点击一个await,这是一个暂停点,在此点允许对该异步函数的另一个调用继续进行。您的诚信度OperationStatus逻辑就会被违反。您不会遇到串行行为。

    如果您对悬挂点感兴趣,我建议您观看 WWDC 2021 视频Swift 并发:幕后花絮 https://developer.apple.com/videos/play/wwdc2021/10254/.

  • The testConcurrency正在打电话waitForExpectations在它实际开始任何能够满足任何期望的任务之前。那总是会超时。

  • The testConcurrency正在使用GCDconcurrentPerform,反过来,它只是调度一个异步任务并立即返回。这违背了整个目的concurrentPerform(这是一种运行一系列的节流机制同步并行任务,但不能超过 CPU 上的最大内核数)。此外,Swift 并发具有自己的模拟功能concurrentPerform,即受约束的“协作线程池”(也在该讨论中讨论过)video https://developer.apple.com/videos/play/wwdc2021/10254/,IIRC),渲染concurrentPerform在 Swift 并发领域已经过时了。

    最重要的是,包含在内没有意义concurrentPerform在 Swift 并发代码库中。使用它也没有意义concurrentPerform启动异步任务(无论是 Swift 并发还是 GCD)。它用于并行启动一系列同步任务。

  • In execute在您的测试中,您有两种执行路径,一种将等待某种状态更改并满足期望,而无需增加存储。这意味着您将失去一些增加该值的尝试。您的总计将与所需的结果值不匹配。现在,如果您的目的是在另一个请求待处理时放弃请求,那没问题。但我不认为那是你的意图。

  • 回答你关于最后如何测试成功的问题。你可能会这样做:

    actor Storage {
        private var counter: Int = 0
    
        func increment() {
            counter += 1
        }
    
        var value: Int { counter }
    }
    
    func testConcurrency() async {
        let storage = Storage()
        let operation = OperationStatus()
        let promise = expectation(description: "testConcurrency")
        let finalCount = outerIterations * iterations
        promise.expectedFulfillmentCount = finalCount
    
        @Sendable func execute() async {
            guard await !operation.isExecuting else {
                await operation.waitUntilFinished()
                promise.fulfill()
                return
            }
    
            await operation.set(isExecuting: true)
            try? await Task.sleep(seconds: 1)
            await storage.increment()
            await operation.set(isExecuting: false)
            promise.fulfill()
        }
    
        // waitForExpectations(timeout: 10)                                      // this is not where you want to wait; moved below, after the tasks started
    
        // DispatchQueue.concurrentPerform(iterations: outerIterations) { _ in   // no point in this
    
        for _ in 0 ..< outerIterations {
            for _ in 0 ..< iterations {
                Task { await execute() }
            }
        }
    
        await waitForExpectations(timeout: 10)
    
        // test the success to see if the store value was correct
    
        let value = await storage.value                                          // to test that you got the right count, fetch the value; note `await`, thus we need to make this an `async` test
    
        // Then
        XCTAssertEqual(finalCount, value, "Count")
    }
    

    现在,此测试将因多种原因而失败,但希望这说明了您将如何验证测试的成功或失败。但是,请注意,这只会测试最终结果是否正确,而不是测试它们是否按顺序执行。事实是Storage是一个演员将隐藏他们没有真正按顺序调用的事实。也就是说,如果您确实需要一个请求的结果来准备下一个请求,则此处不会进行测试。

  • 如果在您经历此过程时,您想真正确认您的行为OperationStatus模式,我建议使用os_signpost间隔(或任务开始和结束的简单日志记录语句)。您将看到异步的单独调用execute方法不是按顺序运行的。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何暂停后续任务直到第一次完成然后与等待的任务共享其响应? 的相关文章

随机推荐