使 Swift 并发中的任务串行运行

2024-03-21

我有一个基于文档的应用程序,它使用结构作为其主要数据/模型。由于模型是(的子类)的属性NSDocument需要从主线程访问它。到目前为止一切都很好。

但对数据的某些操作可能需要相当长的时间,我想为用户提供一个进度条。这就是问题开始的地方。特别是当用户从 GUI 快速连续启动两个操作时。

如果我在模型上同步运行操作(或以“正常”方式)Task {})我得到了正确的串行行为,但主线程被阻止,因此我无法显示进度条。 (选项A)

如果我在模型上运行操作Task.detached {}我可以更新进度条,但根据模型上操作的运行时间,用户的第二个操作可能会在第一个操作之前完成,从而导致模型的无效/意外状态。这是由于await分离任务中需要的语句(我认为)。 (选项B)。

所以我想要 a) 释放主线程来更新 GUI,b) 确保每个任务在另一个(排队的)任务开始之前运行完全完成。使用后台串行调度队列很有可能实现这一点,但我正在尝试切换到新的 Swift 并发系统,该系统也用于在访问模型之前执行任何准备工作。

我尝试使用全局演员,因为这似乎是某种串行后台队列,但它也需要await声明。尽管模型中出现意外状态的可能性降低了,但这仍然是可能的。

我写了一些小代码来演示这个问题:

该模型:

struct Model {
    var doneA = false
    var doneB = false

    mutating func updateA() {
        Thread.sleep(forTimeInterval: 5)
        doneA = true
    }

    mutating func updateB() {
        Thread.sleep(forTimeInterval: 1)
        doneB = true
    }
}

和文件(省略标准NSDocument覆盖):

@globalActor
struct ModelActor {
    actor ActorType { }

    static let shared: ActorType = ActorType()
}

class Document: NSDocument {
    var model = Model() {
        didSet {
            Swift.print(model)
        }
    }

    func update(model: Model) {
        self.model = model
    }

    @ModelActor
    func updateModel(with operation: (Model) -> Model) async {
        var model = await self.model
        model = operation(model)
        await update(model: model)
    }

    @IBAction func operationA(_ sender: Any?) {
        //Option A
//        Task {
//            Swift.print("Performing some A work...")
//            self.model.updateA()
//        }

        //Option B
//        Task.detached {
//            Swift.print("Performing some A work...")
//            var model = await self.model
//            model.updateA()
//            await self.update(model: model)
//        }

        //Option C
        Task.detached {
            Swift.print("Performing some A work...")
            await self.updateModel { model in
                var model = model
                model.updateA()
                return model
            }
        }
    }

    @IBAction func operationB(_ sender: Any?) {
        //Option A
//        Task {
//            Swift.print("Performing some B work...")
//            self.model.updateB()
//        }

        //Option B
//        Task.detached {
//            Swift.print("Performing some B work...")
//            var model = await self.model
//            model.updateB()
//            await self.update(model: model)
//        }

        //Option C
        Task.detached {
            Swift.print("Performing some B work...")
            await self.updateModel { model in
                var model = model
                model.updateB()
                return model
            }
        }
    }
}

单击“操作 A”,然后单击“操作 B”应该会生成一个包含两个的模型true的。但情况并非总是如此。

有没有办法确保操作 A 在我进行操作 B 之前完成并让主线程可用于 GUI 更新?

EDIT根据罗布的回答,我得出以下结论。我这样修改它是因为我可以等待创建的操作并向原始调用者报告任何错误。我认为通过将所有代码包含在一个单一的代码中更容易理解正在发生的事情update函数,所以我选择执行独立任务而不是actor。我还从任务中返回中间模型,否则可能会使用旧模型。

class Document {
    func updateModel(operation: @escaping (Model) throws -> Model) async throws {
        //Update the model in the background
        let modelTask = Task.detached { [previousTask, model] () throws -> Model in
            var model = model

            //Check whether we're cancelled
            try Task.checkCancellation()

            //Check whether we need to wait on earlier task(s)
            if let previousTask = previousTask {
                //If the preceding task succeeds we use its model
                do {
                    model = try await previousTask.value
                } catch {
                    throw CancellationError()
                }
            }

            return try operation(model)
        }


        previousTask = modelTask
        defer { previousTask = nil } //Make sure a later task can always start if we throw

        //Wait for the operation to finish and store the model
        do {
            self.model = try await modelTask.value
        } catch {
            if error is CancellationError { return }
            else { throw error }
        }
    }
}

呼叫方:

@IBAction func operationA(_ sender: Any?) {
    //Option D
    Task {
        do {
            try await updateModel { model in
                var model = model
                model.updateA()
                return model
            }
        } catch {
            presentError(error)
        }
    }
}

它似乎做了我需要的任何事情,即对文档上的属性进行排队更新,可以等待并返回错误,就像所有事情都发生在主线程上一样。 唯一的缺点似乎是在调用方,由于需要使模型成为一个闭包,所以闭包非常冗长。var并明确返回它。


显然,如果您的任务没有任何await或其他暂停点,您只需使用演员,而不是制作该方法async,它会自动按顺序执行它们。

但是,在处理异步 Actor 方法时,必须认识到 Actor 是可重入的(请参阅SE-0306:演员 - 演员重入 https://github.com/apple/swift-evolution/blob/main/proposals/0306-actors.md#actor-reentrancy)。如果您确实尝试串行运行一系列异步任务,您将需要手动让每个后续任务等待前一个任务。例如。,

actor Foo {
    private var previousTask: Task<Void, Error>?

    func add(block: @Sendable @escaping () async throws -> Void) {
        previousTask = Task { [previousTask] in
            let _ = await previousTask?.result

            return try await block()
        }
    }
}

上述内容有两个微妙的方面:

  1. 我使用的捕获列表[previousTask]确保获得先前任务的副本。

  2. 我表演await previousTask?.value inside新任务,而不是之前的任务。

    如果您在创建新任务之前等待,则会发生竞赛,如果您启动三个任务,第二个和第三个任务都将等待first任务,即第三个任务不等待第二个任务。

而且,也许不用说,因为这是在一个 actor 内,所以它避免了对分离任务的需要,同时保持主线程空闲。


注意,使用时非结构化并发 https://docs.swift.org/swift-book/documentation/the-swift-programming-language/concurrency#Unstructured-Concurrency (i.e., Task {…} or Task.detached {…}),您自行承担处理取消的责任,例如使用withTaskCancellationHandler https://developer.apple.com/documentation/swift/withtaskcancellationhandler(operation:oncancel:)/:

actor Foo<Value: Sendable> {
    private var previousTask: Task<Value, Error>?

    func add(block: @Sendable @escaping () async throws -> Value) async throws -> Value {
        let task = Task { [previousTask] in
            try await withTaskCancellationHandler {
                let _ = try await previousTask?.value
            } onCancel: {
                previousTask?.cancel()
            }

            return try await block()
        }

        previousTask = task

        return try await withTaskCancellationHandler {
            try await task.value
        } onCancel: {
            task.cancel()
        }
    }
}

我还将其扩展为可能返回值的块。

因此,例如,我在这里添加了四个任务(即Task.sleep两秒然后返回一个随机值):

或者,如果您在第三个任务中途取消第四个任务:

(不用说,这假设你添加的任务支持取消,抛出CancellationError https://developer.apple.com/documentation/swift/cancellationerror如果取消等。标准Apple API,例如URLSession,执行所有这些操作,但正如您所看到的,如果引入非结构化并发,则需要小心。)


上面的内容有点脆弱,所以我可能建议异步序列(例如,任何符合AsyncSequence https://developer.apple.com/documentation/swift/asyncsequence/协议,例如AsyncStream https://developer.apple.com/documentation/swift/asyncstream或您自己的自定义异步序列),这也可以为您提供串行行为。

Or, AsyncChannel https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Channel.md from Swift 异步算法 https://github.com/apple/swift-async-algorithms是处理触发某些代码块的串行执行的请求管道的另一种好方法。

例如,这是一个使用的串行下载管理器AsyncChannel和一个简单的for-await-in循环实现串行行为:

actor SerialDownloadManager {
    static let shared = SerialDownloadManager()

    private let session: URLSession = …
    private let urls = AsyncChannel<URL>()

    private init() {
        Task { try await startDownloader() }
    }

    // this sends URLs on the channel

    func append(_ url: URL) async {
        await urls.send(url)
    }
}

private extension SerialDownloadManager {
    func startDownloader() async throws {
        let folder = try FileManager.default
            .url(for: .applicationSupportDirectory, in: .userDomainMask, appropriateFor: nil, create: true)
            .appending(component: "downloads")

        try? FileManager.default.createDirectory(at: folder, withIntermediateDirectories: true)

        // this consumes the URLs on the channel

        for await url in urls {
            // if you want to observe in "points of interest"
            //
            // let id = OSSignpostID(log: poi)
            // os_signpost(.begin, log: poi, name: "Download", signpostID: id, "%{public}@", url.lastPathComponent)
            // defer { os_signpost(.end, log: poi, name: "Download", signpostID: id) }

            // download

            let (location, response) = try await self.session.download(from: url, delegate: nil)

            if let response = response as? HTTPURLResponse, 200 ..< 300 ~= response.statusCode {
                let destination = folder.appending(component: url.lastPathComponent)
                try? FileManager.default.removeItem(at: destination)
                try FileManager.default.moveItem(at: location, to: destination)
            }
        }
    }
}

然后你可以做这样的事情:

func appendUrls() async {
    for i in 0 ..< 10 {
        await SerialDownloadManager.shared.append(baseUrl.appending(component: "\(i).jpg"))
    }
}

产量:

或者,如果您愿意,您可以允许任务组进行受限并发,例如,一次执行 4 个任务:

actor DownloadManager {
    static let shared = DownloadManager()

    private let session: URLSession = …
    private let urls = AsyncChannel<URL>()
    private var count = 0
    private let maxConcurrency = 4       // change to 1 for serial downloads, but 4-6 is a good balance between benefits of concurrency, but not overtaxing server

    private init() {
        Task {
            do {
                try await startDownloader()
            } catch {
                logger.error("\(error, privacy: .public)")
            }
        }
    }

    func append(_ url: URL) async {
        await urls.send(url)
    }
}

private extension DownloadManager {
    func startDownloader() async throws {
        let folder = try FileManager.default
            .url(for: .applicationSupportDirectory, in: .userDomainMask, appropriateFor: nil, create: true)
            .appending(component: "downloads")

        try? FileManager.default.createDirectory(at: folder, withIntermediateDirectories: true)

        try await withThrowingTaskGroup(of: Void.self) { group in
            for await url in urls {
                count += 1
                if count > maxConcurrency { try await group.next() }

                group.addTask {
                    // if you want to observe in "points of interest"
                    //
                    // let id = OSSignpostID(log: poi)
                    // os_signpost(.begin, log: poi, name: "Download", signpostID: id, "%{public}@", url.lastPathComponent)
                    // defer { os_signpost(.end, log: poi, name: "Download", signpostID: id) }

                    // download

                    let (location, response) = try await self.session.download(from: url, delegate: nil)

                    if let response = response as? HTTPURLResponse, 200 ..< 300 ~= response.statusCode {
                        let destination = folder.appending(component: url.lastPathComponent)
                        try? FileManager.default.removeItem(at: destination)
                        try FileManager.default.moveItem(at: location, to: destination)
                    }
                }
            }

            try await group.waitForAll()
        }
    }
}

产量:

有关异步序列的更多信息,一般而言,请参阅 WWDC 2021 视频认识异步序列 https://developer.apple.com/videos/play/wwdc2021/10058/.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使 Swift 并发中的任务串行运行 的相关文章

  • ExecutorService 应该是静态的和全局的

    我想在我的应用程序中使用相同的线程池 为此 我可以使ExecutorService静态和全局 以便我可以调用ThreadUtil executorService to get ExecutorService当我需要它的时候 public c
  • 如何向 PyQt5 GUI 添加线程?

    所以我使用 QT Designer 创建了一个 GUI 它工作得很好 但在更复杂的调用中 它不会更新主窗口并锁定 我想运行我的CustomComplexFunction 在根据不断变化的后端信息更新主窗口中的文本编辑时 我希望它每 2 秒运
  • 从 swift 数组创建张量

    这工作正常 import TensorFlow var t Tensor
  • 如何在 RxSwift 中延迟地从 Collection 中一件一件地发出项目

    我想从anyCollection 创建一个Observable 它会在延迟后一一发出每个元素 另外 在下一步我想对项目 模型 执行一些更新 例如 喂所有狗 一只一只 间隔 5 秒 class Dog var name String var
  • 如何在 Swift Playground 中使用 Carthage 导入的框架

    我有一个快速项目 其中通过迦太基添加了一些框架 是否可以在项目内部的游乐场中使用这些框架以及如何使用它 因为 import Argo 不起作用 这在某个时候停止工作了 叹 我现在做的是 创建 macOS gt 命令行工具 创建一个购物车文件
  • 如何在 RxSwift 中取消订阅 Observable?

    我想在 RxSwift 中取消订阅 Observable 为了做到这一点 我曾经将 Disposable 设置为 nil 但在我看来 更新到 RxSwift 3 0 0 beta 2 后 这个技巧不起作用 我无法取消订阅 Observabl
  • 自定义相机视图 Swift iOS 8 iPhone Xcode 6.1

    我想在 iPhone 的 View 中使用相机 我不想使用典型的全屏相机视图 而是我自己的 例如 我想在屏幕中间有一个 200x200 的正方形 并且有一个相机预览 在这个方块下面我想要一个拍照按钮 怎么做 我是新手 速度很快 你会想要使用
  • 使用 swift,是否可以访问辅助功能中的反转颜色功能?

    苹果已经在手机中添加了一般 gt 辅助功能 gt 反转颜色的功能 我可以以某种方式在我的程序中使用它 例如当用户触摸屏幕时颜色反转吗 我不知道有什么方法可以自动执行此操作 但您可以使用 UIColor 上的扩展并访问子视图自行反转颜色 ex
  • 从子线程绘制到窗口

    我的应用程序从工作线程绘制图形已有 10 多年了 而且我从未遇到过任何问题 工作线程吸引到我的HWND 由主线程创建 如下所示 hdc GetDC hwnd SetDIBitsToDevice or StretchDIBits Releas
  • Xcode - 调试视图层次结构

    我正在尝试调试应用程序的视图层次结构 Xcode 窗口的左侧窗格中出现了一个紫色的小方块 请参见屏幕截图 知道这个问题可能是什么吗 好吧 我找到了这个问题的根源 这是一个约束问题
  • ASP.NET MVC 多线程

    我想在我的 asp net mvc 应用程序中实现这样的逻辑 user clicks a button gt server executes some time consuming logic in 15 threads i get dat
  • 使用ftplib进行多线程上传

    我正在尝试进行多线程上传 但出现错误 我猜想也许不可能在 ftplib 中使用多线程 这是我的代码 class myThread threading Thread def init self threadID src counter ima
  • 如何在 Swift 泛型中说“同一类”

    如果 Swift 泛型类型约束是协议名称 我可以要求受该协议约束的两种类型为同一类型 例如 protocol Flier struct Bird Flier struct Insect Flier func flockTwoTogether
  • 在 std::thread 创建的线程中调用 pthread_sigmask 是一个好习惯吗?

    1 我是 std thread 的新手 我想知道调用是否是一个好的做法pthread sigmask 阻止某些信号特别的线程创建者std thread 我不希望新线程接收SIGTERM SIGHUP等信号 因为主进程已经安装了这些信号的处理
  • JavaFX Platform.runLater 的使用以及从不同线程访问 UI

    我有几个问题Platform runLater 我有一个 JavaFX 应用程序类 在这个类中 我运行一个线程 该线程从网络套接字读取数据 现在当我创建一个新的Stage在线程内部 系统抛出异常 JavaFX 事件调度程序线程和我的网络读取
  • 使用 UItableviewCell 实现 Google 地图

    我正在尝试在 UItableviewCell 组件内实现谷歌地图 我这样做的方法是在原型单元中定义 GMSMapView 然后使用 dequeueReusableCell 方法配置地图单元 但是 我尝试应用的任何更改都会失败 例如添加标记
  • Swift 相当于 Objective-C FourCharCode 单引号文字(例如 'TEXT')

    我正在尝试在 Swift 中复制一些 Objective C cocoa 一切都很好 直到我遇到以下情况 Set a new type and creator unsigned long type TEXT unsigned long cr
  • 如何在 Swift 中使用indexesOfObjectsPassingTest:

    IndexOfObjectsPassingTest 的声明在 Swift 中看起来像这样 func indexesOfObjectsPassingTest predicate AnyObject Int CMutablePointer
  • UIStackView分布均匀填充

    所以 我有一个UIStackView其中包含四 4 UIViews 如果我删除其中一 1 个UIViews 其他三 3 个将填满UIStackView 我的问题 如何添加最大高度UIView这样它就不会填满整个空间UIStackView即使
  • 核心数据:重命名属性,而不会导致用户及其当前数据出现问题

    我只想为我的应用程序的新版本重命名并在表上添加属性 并且如果应用程序已安装 我想保留数据 首先我只是设置选项 let options NSMigratePersistentStoresAutomaticallyOption true NSI

随机推荐

  • 如何使用 argparse 将列表作为命令行参数传递?

    我正在尝试将列表作为参数传递给命令行程序 有没有一个argparse https docs python org 3 library argparse html将列表作为选项传递的选项 parser add argument l list
  • 为什么 Pyglet 不能正确绘制多边形?

    我随机创建点用于使用 Pyglet 绘制多边形 但 Pyglet 大多数时候都不能正确完成工作 好吧 我尝试用另一个图形模块绘制多边形 实际上它有效 但如果 Pyglet 工作正常 它会让我的工作变得更容易 我用它来绘制多边形和点 以方便您
  • 无需插件的 jQuery 链式动画

    在使用 jQuery 之前 我可以做一个带有延迟的链式动画 如下所示 element delay 45 animate 45 delay 45 animate 45 delay 45 animate 45 现在 自从更新到 v1 6 1 以
  • 子查询是邪恶的吗?

    这个问题是在一位朋友的评论之后提出的 他说 当一个查询有很多子查询时 就表明数据库存在设计缺陷 必须避免它们 他还表示 许多书籍都提出了同样的建议 我部分同意 但我认为这些查询具有复杂的逻辑 需要大量子查询 或者为了避免子查询 查询的物化视
  • 带 read 和 IFS 的 Bash while 循环

    我必须解析以下格式的文件 line1 param1 line1 param2 line1 param3 line1 param2 line2 param2 line2 param3 line1 param3 line3 param2 lin
  • 我可以在 python 中“伪造”一个包(或至少一个模块)以用于测试目的吗?

    我想用 python 伪造一个包 我想定义一些东西以便代码可以做 from somefakepackage morefakestuff import somethingfake somefakepackage 是在代码中定义的 其下面的所有
  • 如何处理drive api的最大导出限制大小文件

    我正在尝试下载一些 google doc 文件 但之后我需要使用导出方法转换为 microsoft word mimetype 它工作正常 直到找到一个大小超过 10 mb 的文件 api 文档说这是限制导出文档的大小 但我确实需要下载这些
  • 在 ElasticSearch 中获取 SearchResponse 的结果

    我正在尝试使用 ES 作为我的 MongoDB 的索引 我已经成功地集成了它们 但我发现搜索 API 相当复杂且令人困惑 Java API 也没有太大帮助 我能够找到完全匹配的内容 但是如何才能得到这个结果呢 这是我的代码 Node nod
  • JAX-WS Web 服务线程安全和性能问题

    我从其他一些帖子以及我对 JAX WS Web 服务的理解中了解到它们不是线程安全的 我的 Web 服务将被 100 个客户端调用 我们需要能够每秒处理大约 200 个事务 我的网络服务将与数据库交互以执行其工作 如果我在访问数据库的代码周
  • WebBrowser 控制会话中下载文件

    我在用着WebBrowser control浏览登录页面并下载文件 由于我找不到使用我正在使用的控件自动管理下载的方法WebClient类来尝试实现这一目标 问题是自从WebClient与浏览器不在同一上下文 会话中 我下载的只是安全错误屏
  • 如何使用树表显示 Oracle SQL 表中的所有行?

    我有这张表 为每个区域创建表 id area INT 主键 名称 VARCHAR2 200 id areapadre INT 引用 perarea id area 添加以下内容是为了访问数据 我的目的是创建一个层次结构 在树视图中显示区域及
  • 从 python 中的文本文件中读取特定列

    我有一个文本文件 其中包含一个由数字组成的表格 例如 5 10 6 6 20 1 7 30 4 8 40 3 9 23 1 4 13 6 例如 如果我想要仅包含在第二列中的数字 我如何将该列提取到列表中 f open file r line
  • MPI 中的相同发送和接收缓冲区

    在我的代码中 每个进程都作用于数组的特定部分 我希望每个进程将其处理的部分发送到其他进程 并从其他进程接收其他部分 为此我使用了MPI Allgatherv但我保持发送和接收缓冲区相同 MPI Allgatherv vel 0 localS
  • 如何将自定义的 Firefox 配置文件与 PHPUnit 的 Webdriver 框架结合使用?

    我知道使用 selenium RC 我曾经传递一个命令行操作符 firefoxProfileTemplate 这样就可以了 现在使用 Selenium2 Webdriver 这似乎不再起作用了 由于我正在使用 PHPUnit 进行其他测试
  • AWS Step Functions 是否登录 CloudWatch

    我想知道 AWS 步骤函数执行的输出是否记录在 CloudWatch 日志组中 我是not讨论由 step 函数调用的 lambda 函数的输出 我对状态机本身的输出感兴趣 我问这个问题是因为我们通常将所有日志集中在 loggly 中 以便
  • 是否可以将代码隐藏分成多个部分文件?

    我有一个带有 aspx cs 代码隐藏的 aspx Web 表单 隐藏的代码有近 2000 行长 而且已经到了轻松导航的唯一方法就是在各部分之间放置大量空格 缩小以便我可以看到代码的物理外观 并且然后放大我要编辑的地方 换句话说 这是一个很
  • 使用 Qt Designer 调整 Qt 拆分器布局大小行为

    我在 Qt 中通过拖放制作的视图中存在尺寸问题 让我从一张图片开始来帮助我解释 这是我的表单的主窗口 发生的情况是 我们有 4 个选项卡小部件 左侧选项卡小部件有一个到 2 个中间小部件的水平分割器 2 个中间小部件有一个垂直分离器 左侧和
  • 如何检测请求是否被中止?

    我正在提出请求 然后立即中止 var x get url function d e xhr alert d x abort 问题是它执行success函数并返回空数据 这里的例子 http jsfiddle net e5NBT 有没有 jQ
  • 将 ASM 转换为 C(不是逆向工程)

    我用谷歌搜索 发现数量惊人的轻率回复 基本上都是在嘲笑提出这样问题的提问者 Microchip 免费提供一些源代码 我不想将其发布在这里 以防万一 基本上 谷歌 AN937 单击第一个链接 其中有一个 源代码 链接及其压缩文件 它在 ASM
  • 使 Swift 并发中的任务串行运行

    我有一个基于文档的应用程序 它使用结构作为其主要数据 模型 由于模型是 的子类 的属性NSDocument需要从主线程访问它 到目前为止一切都很好 但对数据的某些操作可能需要相当长的时间 我想为用户提供一个进度条 这就是问题开始的地方 特别