如何使用 Beam 读取大型 CSV?

2024-02-20

我正在尝试弄清楚如何使用 Apache Beam 读取大型 CSV 文件。我所说的“大”是指几 GB(因此将整个 CSV 一次性读入内存是不切实际的)。

到目前为止,我已经尝试了以下选项:

  • 使用 TextIO.read():这不好,因为带引号的 CSV 字段可能包含换行符。此外,它尝试一次将整个文件读入内存。
  • 编写一个 DoFn,将文件作为流读取并发出记录(例如使用 commons-csv)。但是,这仍然会一次读取整个文件。
  • 尝试可分割的 DoFn如此处所述 https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html。我的目标是让它逐渐将记录作为无界 PCollection 发出 - 基本上,将我的文件转换为记录流。然而,(1) 很难正确计数 (2) 由于 ParDo 创建多个线程,因此需要一些黑客同步,以及 (3) 我生成的 PCollection 仍然不是无限的。
  • 尝试创建我自己的 UnboundedSource。这似乎非常复杂并且记录很少(除非我遗漏了一些东西?)。

Beam 是否提供了任何简单的东西来允许我按照我想要的方式解析文件,而不必在继续下一个转换之前将整个文件读入内存?


从 Beam 的角度来看,TextIO 应该做正确的事情,即尽快读取文本文件并将事件发送到下一阶段。

我猜您正在为此使用 DirectRunner,这就是您看到大量内存占用的原因。希望这不是太多的解释:DirectRunner 是小型作业的测试运行器,因此它将中间步骤缓冲在内存中而不是缓冲到磁盘中。如果您仍在测试管道,则应该使用一小部分数据样本,直到您认为它有效为止。然后,您可以使用 Apache Flink 运行程序或 Google Cloud Dataflow 运行程序,它们都会在需要时将中间阶段写入磁盘。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用 Beam 读取大型 CSV? 的相关文章

随机推荐