并行读取和顺序写入?

2023-12-21

我有以下代码,可以读取和写入每个id依次。

async def main():
    while id < 1000:
       data = await read_async(id) 
       await data.write_async(f'{id}.csv')
       id += 1
       

read_async()需要几分钟并且write_async()运行时间不到一分钟。现在我想要

  1. Run read_async(id)在平行下。但是,由于内存限制,最多可以并行运行 3 个调用。
  2. write_async必须按顺序运行,即write_async(n+1)之前不能运行write_async(n).

您可以使用队列和固定数量的任务来从主任务中读取和写入。主要任务可以使用事件来查找读者是否可以使用新数据,并使用共享字典从读者那里获取新数据。例如(未经测试):

async def reader(q, id_to_data, data_ready):
    while True:
        id = await q.get()
        data = await read_async(id) 
        id_to_data[id] = data
        data_ready.set()

async def main():
    q = asyncio.Queue()
    for id in range(1000):
        await q.put(id)

    id_to_data = {}
    data_ready = asyncio.Event()
    readers = [asyncio.create_task(reader(q, id_to_data, data_ready))
               for _ in 3]

    for id in range(1000):
       while True:
           # wait for the current ID to appear before writing
           if id in id_to_data:
               data = id_to_data.pop(id)
               await data.write_async(f'{id}.csv')
               break
               # move on to the next ID
           else:
               # wait for new data and try again
               await data_ready.wait()
               data_ready.clear()

    for r in readers:
        r.cancel()

使用单独的结果队列而不是事件队列是行不通的,因为队列是无序的。优先级队列可以解决这个问题,但它仍然会立即返回当前可用的最低 id,而编写者需要nextid 以便按顺序处理所有 id。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

并行读取和顺序写入? 的相关文章

随机推荐

  • Flutter 上的简单可扩展列也不会具有“标题”>“可扩展”

    我正在尝试制作一个可扩展的列 也就是说 它有 2 个子项 子项和展开小部件时出现的子项 在下图中 您可以看到子项 蓝色 和展开的子项 红色 它们仅应在Expand gt 单击按钮 一切正常 但无论我做什么 我都无法得到Expand gt 按
  • Oracle 如果行不存在则插入

    insert ignore into table1 select value1 value2 from table2 where table2 type ok 当我运行这个时 我收到错误 缺少 INTO 关键字 当我运行这个时 我收到错误
  • HBase 表上的 SparkSQL

    任何人都直接在 HBase 表上使用 SparkSQL 就像在 Hive 表上使用 SparkSQL 一样 我是spark新手 请指导我如何连接hbase和spark 如何查询hbase表 AFAIK 有 2 种方法连接到 hbase 表
  • iOS - 使用 AVPlayer 检测 URL 流是否正常工作

    这就是我的代码中从 url 播放的样子 private func play let streamUrl let playerItem AVPlayerItem url streamURL radioPlayer AVPlayer playe
  • ls | 的输出厕所-l

    通常输出为wc l命令给出文件中的行数 但是 当我们通过管道输出ls命令它 它似乎正确显示当前工作目录中的文件和目录以及链接的数量 我的问题是输出ls命令在同一行中显示某些文件和目录的名称 那么 为什么在这种情况下使用ls wc l与相比
  • 如何使用 Powershell 更改文件的属性?

    我有一个 Powershell 脚本 可以将文件从一个位置复制到另一个位置 复制完成后 我想清除源位置中已复制的文件的存档属性 如何使用 Powershell 清除文件的 Archive 属性 您可以使用旧的 dos attrib 命令 如
  • 以编程方式更改 ABAddressBook、ABPersonCopyArrayOfAllLinkedPeople 中的链接联系人数组

    是否可以通过编程方式添加 删除 CFArray 返回的联系人ABPersonCopyArrayOfAllLinkedPeople 因此 有效地链接和取消链接来自不同来源的不同联系人记录 以在 iOs 电话簿中显示为 统一 据我所知 至少在
  • 如何在java中打开第2层原始套接字?

    如何在java中打开第2层原始套接字 在 C 中 我们通常使用 AF PACKET 级别来打开具有 sockaddr ll 结构的第 2 层原始套接字 二层编程对应的socket包是什么 使用普通的 Java 方法这是不可能的 因为 Jav
  • 使用 nuxtJS + Vue2-Editor 时如何解决文档未定义错误?

    我正在尝试使用 vue2 editor 设置 nuxtjs 应用程序 如果我尝试通过客户端导航导航到编辑器页面 则其加载但如果我直接访问或刷新 例如 com editor 页面 我收到文档未定义错误 我已经识别出它是因为 vue2 编辑器不
  • 如何交换 observableArray 中的两个项目?

    我有一个按钮 可以将 observableArray 中的项目向左移动一个位置 我正在按照以下方式进行操作 然而 缺点是categories index 被从数组中删除 从而丢弃了该节点上的任何DOM操作 在我的例子中是通过jQuery验证
  • NUnit 异步测试导致 AppDomainUnloadedException

    我有一个带有异步操作的 NET 4 5 WCF 服务 我进行了集成测试 它使用 NetNamedPipeBinding 构建服务主机并通过客户端进行操作 然而 每次这样的测试总是会导致 NUnit 报告以下内容 System AppDoma
  • 在 DBAccess 中关联两个对象

    我正在为我的 iOS 项目使用 dbaccess 如何将数组传递给 dbaccess 对象 例如 我有像这样的 dbobject interface Member DBObject property strong NSString firs
  • 如何发送带有参数的 getForObject 请求 Spring MVC

    我在服务器端有一个方法 它为我提供有关在我的数据库中注册的特定名称的信息 我正在从我的 Android 应用程序访问它 向服务器的请求正常完成 我想做的是根据我想要获取的名称将参数传递给服务器 这是我的服务器端方法 RequestMappi
  • 我应该严格避免在 Android 上使用枚举吗?

    我曾经定义一组相关的常量 例如Bundle在如下界面中组合键 public interface From String LOGIN SCREEN LoginSCreen String NOTIFICATION Notification St
  • C++ 枚举从 0 开始吗​​?

    如果我有一个enum不给枚举分配数字 它的序数值会是 0 吗 例如 enum enumeration ZERO ONE TWO THREE FOUR FIVE SIX SEVEN EIGHT NINE 我已经找到了帖子引用 C99 标准需要
  • 如何在 Angular 6 中创建级联下拉列表(国家和州列表)

    如何在 Angular 6 中创建级联下拉列表 国家 地区和州列表 我想要在 Angular 6 中创建一个完整的国家 地区和州列表 任何知道这一点的人请分享你的想法 演示 gt 级联下拉菜单 国家和州列表 https stackblitz
  • 使用 NUnit 测试 Windows 8 应用商店应用程序

    我目前正在为一门课程开发 Windows 应用商店应用程序 Windows 8 但在运行 NUnit 测试时遇到问题 我的解决方案 项目设置如下所示 TheMetroApp sln SQLite net csproj 类库 Windows
  • 如何在 C# 中按类型查找 .cs 文件的路径

    如何按类型查找 cs 文件的路径 函数原型 string FindPath Type 返回类似 C Projects MyClass cs 的内容 在 Net 4 5中你可以使用CallerFilePath反射属性 来自MSDN using
  • 需要帮助创建 YouTube 视频列表

    我想访问 Youtube 视频并在 ListView 中显示视频的缩略图及其标题 单击缩略图后 应播放视频 任何人都可以建议一些与我的需求或链接相关的示例程序 这是我使用 URL 播放 youtube 视频的代码 这可能无法满足您的完整要求
  • 并行读取和顺序写入?

    我有以下代码 可以读取和写入每个id依次 async def main while id lt 1000 data await read async id await data write async f id csv id 1 read