我在 GCS 或其他受支持的文件系统上有一个目录,外部进程正在向该目录写入新文件。
我想编写一个 Apache Beam 流式传输管道,它可以连续监视此目录中的新文件,并在每个新文件到达时读取和处理它。这可能吗?
从 Apache Beam 2.2.0 开始,这是可能的。有几个 API 支持此用例:
如果您正在使用TextIO
or AvroIO
,他们通过以下方式明确支持这一点TextIO.read().watchForNewFiles()
和同样的readAll()
, 例如:
PCollection<String> lines = p.apply(TextIO.read()
.from("gs://path/to/files/*")
.watchForNewFiles(
// Check for new files every 30 seconds
Duration.standardSeconds(30),
// Never stop checking for new files
Watch.Growth.<String>never()));
如果您使用不同的文件格式,您可以使用FileIO.match().continuously()
and FileIO.matchAll().continuously()
它支持相同的API,结合FileIO.readMatches()
.
API 支持指定检查新文件的频率以及何时停止检查(支持的条件例如“如果在给定时间内没有新输出出现”、“观察 N 个输出后”、“自开始检查以来的给定时间后” ”及其组合)。
请注意,目前此功能仅适用于 Direct 运行器和 Dataflow 运行器,并且仅适用于 Java SDK。一般来说,它可以在任何支持的运行器中工作可分割自由度 (see 能力矩阵 https://beam.apache.org/documentation/runners/capability-matrix/).
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)