在 Apache Beam 中监视与文件模式匹配的新文件

2024-04-21

我在 GCS 或其他受支持的文件系统上有一个目录,外部进程正在向该目录写入新文件。

我想编写一个 Apache Beam 流式传输管道,它可以连续监视此目录中的新文件,并在每个新文件到达时读取和处理它。这可能吗?


从 Apache Beam 2.2.0 开始,这是可能的。有几个 API 支持此用例:

如果您正在使用TextIO or AvroIO,他们通过以下方式明确支持这一点TextIO.read().watchForNewFiles()和同样的readAll(), 例如:

PCollection<String> lines = p.apply(TextIO.read()
    .from("gs://path/to/files/*")
    .watchForNewFiles(
        // Check for new files every 30 seconds
        Duration.standardSeconds(30),
        // Never stop checking for new files
        Watch.Growth.<String>never()));

如果您使用不同的文件格式,您可以使用FileIO.match().continuously() and FileIO.matchAll().continuously()它支持相同的API,结合FileIO.readMatches().

API 支持指定检查新文件的频率以及何时停止检查(支持的条件例如“如果在给定时间内没有新输出出现”、“观察 N 个输出后”、“自开始检查以来的给定时间后” ”及其组合)。

请注意,目前此功能仅适用于 Direct 运行器和 Dataflow 运行器,并且仅适用于 Java SDK。一般来说,它可以在任何支持的运行器中工作可分割自由度 (see 能力矩阵 https://beam.apache.org/documentation/runners/capability-matrix/).

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Apache Beam 中监视与文件模式匹配的新文件 的相关文章

随机推荐

  • 安卓。谷歌 API 翻译

    我在集成 Google API Translate 时遇到一些问题 添加到 gradle 配置此依赖项 compile com google apis google api services translate v2 rev41 1 20
  • 使用 Google Oauth2 客户端访问 API 时 Rails 3.2.3 中出现 SSL 错误

    我对 OAuth2 相当陌生 我正在尝试使用 Omniauth 和 Google API 客户端通过 Google API 访问用户的 Blogger 帐户 我正在使用以下内容 轨道3 2 3 红宝石 1 9 3 oauth2 0 8 0
  • 更有效的循环方式?

    我有来自一个更大脚本的一小段代码 我发现当函数t area被调用时 它负责大部分运行时间 我自己测试了这个函数 它并不慢 我相信它需要运行很多次 所以需要花费很多时间 这是调用该函数的代码 tri area np zeros numx nu
  • 当存在变量空间分隔列时,在 python (numpy) 中加载数据集

    我有一个包含数字数据的大数据集 并且在其某些行中存在分隔列的可变空间 例如 4 5 6 7 8 9 2 3 4 当我使用这条线时 dataset numpy loadtxt dataset txt delimiter 我收到此错误 Valu
  • Foreach - 并行对象

    最近我们开始编写需要很长时间才能完成的脚本 因此我们深入研究了 PowerShell 工作流程 阅读一些文档后 我了解了基础知识 但是 我似乎找不到一种方法来创建 PSCustomObject 对于一个内的每个单独的项目foreach pa
  • 如何在 R 中迭代生成组合? [复制]

    这个问题在这里已经有答案了 所以我目前正在使用以下代码来生成我的组合 组合 x y 但问题是函数存储了所有可能的组合 我不想存储它们 我只想通过循环或其他方式生成它们 这对我的程序来说会更有效率 有没有办法通过 for 循环生成组合而不是全
  • 如何从另一个目录运行 Maven(无需 cd 到项目目录)?

    假设我的maven项目位于 some location project我当前的位置是 another location 如何在不更改项目位置的情况下运行 Maven 构建cd some location project 您可以使用参数 f
  • 模拟迭代行为

    我有一个具有迭代行为的界面 但在 Rhinomocks 中模拟它时遇到了麻烦 示例接口和类是我的问题的一个非常简单的版本 每次调用 LineReader Read 时 LineReader CurrentLine 都应返回不同的值 下一行
  • 如何使用 API 路由在 Next.js 上下载文件

    我正在使用 next js 我有一个第三方服务 我需要从中检索 PDF 文件 该服务需要一个 API 密钥 我不想在客户端公开该密钥 这是我的文件 api getPDFFile js const options method GET enc
  • 使用 sqlite json_each 过滤 json 数组中的多个项目

    我有一个包含以下架构和数据的 sqlite 表 CREATE TABLE Feeds id INTEGER PRIMARY KEY AUTOINCREMENT groups JSON NOT NULL DEFAULT INSERT INTO
  • 哪种快速方法可以并行化 2D NumPy 数组的元素乘法?

    我使用 NumPy 函数einsum https docs scipy org doc numpy reference generated numpy einsum html执行两个 2D NumPy 数组的逐元素乘法和求和 np eins
  • 自动 PayPal 付款

    我正在寻找一种方法来自动将钱从我的 PayPal 帐户发送到其他 PayPal 帐户 通过 PHP 这可能吗 就像是 recievers array email protected cdn cgi l email protection em
  • 使用 bootstrap-modal 作为 Backbone.js 视图

    我正在尝试创建一个基于 Twitter 引导模式的 Backbone js 视图 该视图通过以下方式使用 Backbone 的自动事件委托 events视图的属性 不幸的是 bootstrap modal 似乎破坏了 Backbone 的事
  • 帮助查找内存泄漏(一般提示)

    这是在 iOS 上 设备是 iPad 发生的情况如下 我在设备上运行应用程序或使用 Xcode 进行调试和运行 5 分钟后 我收到 1 级内存警告 一分钟后 我收到 2 级内存警告 又过了一分钟 Program received signa
  • Redux-saga takeLatest 有条件地

    我有一个与 redux saga 相关的问题 有没有办法实现 takeLatest 但有条件 例如 我有一个歌曲类型的动态列表 Rap Pop Hip hop 我想按歌曲类型获取歌曲 我定义了一个类型为 FETCH SONGS BY TYP
  • Spring RestTemplate 中不存在必需的字符串参数

    我在使用 RestTemplate 发布 2 个参数时遇到问题 a String 多部分文件 我不认为我的控制器有问题 因为它非常基本 看来控制器没有收到名称参数 你能告诉我我的代码有什么问题吗 控制器 收件人 RequestMapping
  • Object.getPrototypeOf() 混淆

    Object getPrototypeOf obj 如何工作 根据定义 Object getPrototypeOf obj 应该返回对象的原型属性 或者以其他方式与 obj constructor prototype 相同 用 new 创建
  • Java:在循环中实例化变量:好还是坏风格?

    我有一个简单的问题 通常我会写这样的代码 String myString hello for int i 0 i lt 10 i myString hello again 因为我认为以下不会是好的样式 因为它会创建太多不必要的对象 for
  • 查找嵌套字典和列表中某个键的所有出现位置

    我有一本这样的字典 id abcde key1 blah key2 blah blah nestedlist id qwerty nestednestedlist id xyz keyA blah blah blah id fghi k
  • 在 Apache Beam 中监视与文件模式匹配的新文件

    我在 GCS 或其他受支持的文件系统上有一个目录 外部进程正在向该目录写入新文件 我想编写一个 Apache Beam 流式传输管道 它可以连续监视此目录中的新文件 并在每个新文件到达时读取和处理它 这可能吗 从 Apache Beam 2