Python并行将GCS中的.json文件读取到pandas DF中

2024-01-20

TL;DR: asyncio vs multi-processing vs threading vs. some other solution并行化 for 循环,从 GCS 读取文件,然后将这些数据一起附加到 pandas 数据帧中,然后写入 BigQuery...

我想并行一个Python函数来读取数十万个小数据.json来自 GCS 目录的文件,然后转换这些文件.jsons到 pandas 数据帧中,然后将 pandas 数据帧写入 BigQuery 表。

这是该函数的非并行版本:

import gcsfs
import pandas as pd
from my.helpers import get_gcs_file_list
def load_gcs_to_bq(gcs_directory, bq_table):

    # my own function to get list of filenames from GCS directory
    files = get_gcs_file_list(directory=gcs_directory) # 

    # Create new table
    output_df = pd.DataFrame()
    fs = gcsfs.GCSFileSystem() # Google Cloud Storage (GCS) File System (FS)
    counter = 0
    for file in files:

        # read files from GCS
        with fs.open(file, 'r') as f:
            gcs_data = json.loads(f.read())
            data = [gcs_data] if isinstance(gcs_data, dict) else gcs_data
            this_df = pd.DataFrame(data)
            output_df = output_df.append(this_df)

        # Write to BigQuery for every 5K rows of data
        counter += 1
        if (counter % 5000 == 0):
            pd.DataFrame.to_gbq(output_df, bq_table, project_id=my_id, if_exists='append')
            output_df = pd.DataFrame() # and reset the dataframe


    # Write remaining rows to BigQuery
    pd.DataFrame.to_gbq(output_df, bq_table, project_id=my_id, if_exists='append')

这个函数很简单:

  • grab ['gcs_dir/file1.json', 'gcs_dir/file2.json', ...], GCS 中的文件名列表
  • loop over each file name, and:
    • 从 GCS 读取文件
    • 将数据转换为 pandas DF
    • 附加到主 pandas DF
    • 每 5K 循环写入 BigQuery(因为随着 DF 变大,追加速度会变慢)

我必须在几个 GCS 目录上运行这个函数,每个目录都有大约 500K 个文件。由于读/写这么多小文件的瓶颈,这个过程对于一个目录来说大约需要 24 小时...如果我能让这个更加并行以加快速度,那就太好了,因为这似乎是一个任务适合并行化。

Edit:下面的解决方案很有帮助,但我对从 python 脚本中并行运行特别感兴趣。 Pandas 正在处理一些数据清理,并使用bq load会抛出错误。有asyncio https://docs.python.org/3/library/asyncio.html和这个gcloud-aio-存储 https://pypi.org/project/gcloud-aio-storage/这两者似乎都对这项任务有用,也许是比线程或多处理更好的选择......


不要向 Python 代码中添加并行处理,而是考虑多次并行调用 Python 程序。这个技巧更适合在命令行上获取文件列表的程序。因此,为了这篇文章,让我们考虑更改程序中的一行:

您的线路:

# my own function to get list of filenames from GCS directory
files = get_gcs_file_list(directory=gcs_directory) # 

新队:

files = sys.argv[1:]  # ok, import sys, too

现在,您可以通过以下方式调用您的程序:

PROCESSES=100
get_gcs_file_list.py | xargs -P $PROCESSES your_program

xargs现在将采用以下方式输出的文件名get_gcs_file_list.py并调用your_program最多并行 100 次,每行容纳尽可能多的文件名。我相信文件名的数量仅限于 shell 允许的最大命令大小。如果 100 个进程不足以处理所有文件,xargs 将调用your_program一次又一次,直到处理从 stdin 读取的所有文件名。xargs确保调用次数不超过 100 次your_program同时运行。您可以根据主机可用的资源来改变进程数。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python并行将GCS中的.json文件读取到pandas DF中 的相关文章

随机推荐

  • 打开 Chrome 或 Firefox,无需选项卡和其他菜单项,并设置屏幕位置和大小

    我需要在设定的位置启动 Chrome 或 Firefox 具有设定的大小 并且没有选项卡和其他菜单项 最终目标是一个桌面图标 我可以单击桌面图标以在桌面上的 x y 处获得一个 X 宽 Y 高的窗口 显示特定页面 而无需将这些设置永久保留在
  • vc++ 应用程序中的看门狗

    我写了一个简单的vc 后台应用程序 我正在尝试的就像一个看门狗服务 可以监视应用程序是否正在运行 如果应用程序崩溃 那么服务应该启动应用程序 为了通过 Windows 安装程序创建安装程序 我仅使用 app exe 和 app dll 是否
  • WPF - 绑定到菜单图标

    我有一个包含菜单的用户控件 我需要将 Menu Icon 绑定到 UserControl 的属性 但它不起作用 代码是这样开始的
  • 从可执行文件创建 Windows 服务

    是否有任何快速方法可以根据给定的可执行文件创建 Windows 服务 并在启动时启动它 要从可执行文件创建 Windows 服务 您可以使用sc exe sc exe create
  • Code OSS 和 Visual Studio Code 之间的差异

    正如开发商所说here https github com Microsoft vscode issues 60 issuecomment 161792005 所有这一切的最酷之处在于 您可以选择在我们的许可下使用 Visual Studio
  • 如何实现跨应用导航并导航回Fiori launchpad主页?

    在我们公司 我们建立了 Fiori 启动板 在其中 我们配置了一个链接到自定义开发的 SAPUI5 应用程序的磁贴 该应用程序作为 BSP 部署在服务器上 我们花了很长时间 但使用该应用程序中的路由器模式 我们成功地实现了从启动板磁贴到自定
  • 有没有脚本 SVG 编辑器?

    我想使用某种脚本语言 最好是 Python 编辑 SVG 文件 特别是 我想合并两个 SVG 文件 添加一些注释 并将它们排列在更大的图像中 有没有可用于此类目的的软件 Thanks Bartosz UPDATE 我最终决定使用nosklo
  • 网页字体大小中的px到底代表什么

    我正在做一些图形 在Javascript中 如果相关的话 我知道你可以使用pt px 等查找什么时px这意味着 每个网站似乎都有相同的模糊答案 字体大小 以像素为单位 我想知道什么exactly px代表 例如 如果我有20px 20 像素
  • 尝试与 Web 服务通信时遇到“没有到主机的路由”错误

    我正在尝试与我的笔记本电脑上的网络服务进行通信 并使用 Android 手机作为客户端 我试图做一个简单的登录功能 但是 每当客户端尝试与 Web 服务通信时 Eclipse 上的 DDMS 中就会弹出此错误 我可以通过家庭网络访问网络服务
  • Android 离线语音识别只显示一个结果?

    我已经设置了语音识别服务 如本文所示Android 语音识别作为 Android 4 1 和 4 2 上的服务 https stackoverflow com questions 14940657 android speech recogn
  • 如何使 Jinja2 中的 tojson() 过滤器输出 Unicode 而不是转义序列?

    我的模板是用于JS的 let SETTINGS settings tojson 4 我的设置是一个字典 name Russian name id 12345 如果我渲染它 我会得到 let SETTINGS name Russian nam
  • 使用用户变量访问结构成员

    假设我有一个结构如下 struct person int age char name 24 person 用户给出程序应该读取哪个结构成员的参数 program age int main int argc char argv int i i
  • 使用四边形的重心坐标

    你们中的一些人知道如何使用重心填充二维四边形 坐标 目前 我将四边形分成2个三角形 但这种方式效率低下 因为我必须迭代第二个 边界框重复先前填充的像素 通过 例如 为了填充第二个三角形 我遍历了第一个三角形 属于由第二个三角形形成的边界框
  • Lua 中的 OOP 和事件监听器 (Corona SDK)

    我在 Corona SDK 中的第一步和第一次遇到的麻烦 尝试制作两个盒子 我可以通过以下方式移动它们this http www ludicroussoftware com blog 2011 07 06 simple oop with i
  • 如何将图像列添加到 wicket 框架中的表中?

    我想在 wicket 框架中的表格的每个单元格中添加包含图像的列 我在 java 类中创建表格 并有一个 createColumns 方法 如下所示 private List
  • 当“if else”/“instance of”不可避免时,除了使用访问者模式之外,我们如何改进设计?

    当我们有一个纯粹是语义继承而不是行为继承的对象层次结构时 我们不可避免地需要到处编写 instanceof 或 if else 来进行运行时类型检查 E g 如果我有一个对象层次结构 Class Function Class Average
  • 绘图加载时禁用闪亮按钮

    加载绘图 反应元素时是否可以禁用闪亮的按钮 我知道shinyjs可以禁用和启用输入元素 但我不知道如何设置与加载图 反应元素的连接 该示例基于单文件闪亮应用程序页面 https shiny rstudio com articles sing
  • 为未遇到的输入创建神经网络

    我正在使用创建一个简单的多层前馈神经网络锻造网 http www aforgenet com framework NN 库 我的神经网络是一个 3 层激活网络 使用反向传播学习算法通过监督学习方法进行训练 以下是我的初始设置 learnin
  • 如何在 Spring Data JPA 中使用带有分页的投影接口?

    我正在尝试使用 Spring Data 的新功能获取部分实体 NetworkSimple 的页面 预测 http docs spring io spring data rest docs current reference html pro
  • Python并行将GCS中的.json文件读取到pandas DF中

    TL DR asyncio vs multi processing vs threading vs some other solution并行化 for 循环 从 GCS 读取文件 然后将这些数据一起附加到 pandas 数据帧中 然后写入