TL;DR: asyncio
vs multi-processing
vs threading
vs. some other solution
并行化 for 循环,从 GCS 读取文件,然后将这些数据一起附加到 pandas 数据帧中,然后写入 BigQuery...
我想并行一个Python函数来读取数十万个小数据.json来自 GCS 目录的文件,然后转换这些文件.jsons到 pandas 数据帧中,然后将 pandas 数据帧写入 BigQuery 表。
这是该函数的非并行版本:
import gcsfs
import pandas as pd
from my.helpers import get_gcs_file_list
def load_gcs_to_bq(gcs_directory, bq_table):
# my own function to get list of filenames from GCS directory
files = get_gcs_file_list(directory=gcs_directory) #
# Create new table
output_df = pd.DataFrame()
fs = gcsfs.GCSFileSystem() # Google Cloud Storage (GCS) File System (FS)
counter = 0
for file in files:
# read files from GCS
with fs.open(file, 'r') as f:
gcs_data = json.loads(f.read())
data = [gcs_data] if isinstance(gcs_data, dict) else gcs_data
this_df = pd.DataFrame(data)
output_df = output_df.append(this_df)
# Write to BigQuery for every 5K rows of data
counter += 1
if (counter % 5000 == 0):
pd.DataFrame.to_gbq(output_df, bq_table, project_id=my_id, if_exists='append')
output_df = pd.DataFrame() # and reset the dataframe
# Write remaining rows to BigQuery
pd.DataFrame.to_gbq(output_df, bq_table, project_id=my_id, if_exists='append')
这个函数很简单:
- grab
['gcs_dir/file1.json', 'gcs_dir/file2.json', ...]
, GCS 中的文件名列表
- loop over each file name, and:
- 从 GCS 读取文件
- 将数据转换为 pandas DF
- 附加到主 pandas DF
- 每 5K 循环写入 BigQuery(因为随着 DF 变大,追加速度会变慢)
我必须在几个 GCS 目录上运行这个函数,每个目录都有大约 500K 个文件。由于读/写这么多小文件的瓶颈,这个过程对于一个目录来说大约需要 24 小时...如果我能让这个更加并行以加快速度,那就太好了,因为这似乎是一个任务适合并行化。
Edit:下面的解决方案很有帮助,但我对从 python 脚本中并行运行特别感兴趣。 Pandas 正在处理一些数据清理,并使用bq load
会抛出错误。有asyncio https://docs.python.org/3/library/asyncio.html和这个gcloud-aio-存储 https://pypi.org/project/gcloud-aio-storage/这两者似乎都对这项任务有用,也许是比线程或多处理更好的选择......