在 DirectPipelineRunner 上使用自定义 DataFlow 无界源

2024-02-17

我正在编写一个从 Kafka 0.8 读取的自定义 DataFlow 无界数据源。我想使用 DirectPipelineRunner 在本地运行它。但是,我得到以下堆栈跟踪:

Exception in thread "main" java.lang.IllegalStateException: no evaluator registered for Read(KafkaDataflowSource)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:700)
        at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
        at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
        at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
        at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:252)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:662)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:374)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:87)
        at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:174)

这是有道理的,因为我任何时候都没有为我的自定义源注册评估程序。

Reading https://github.com/GoogleCloudPlatform/DataflowJavaSDK https://github.com/GoogleCloudPlatform/DataflowJavaSDK,似乎只有评估者bounded来源已注册。为自定义无限源定义和注册评估器的推荐方法是什么?


DirectPipelineRunner目前仅在有界输入上运行。我们正在积极努力消除此限制,并预计很快就会发布。

同时,您可以轻松地转动任何UnboundedSource into a BoundedSource,出于测试目的,通过使用withMaxNumRecords,如下例所示:

UnboundedSource<String> unboundedSource  = ...; // make a Kafka source
PCollection<String> boundedKafkaCollection =
    p.apply(Read.from(unboundedSource).withMaxNumRecords(10));

See 这个问题在 GitHub 上 https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/93更多细节。


另外,还有一些致力于贡献 Kafka 连接器的努力。您可能想通过以下方式与我们和其他贡献者互动我们的 GitHub 存储库 https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/96.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 DirectPipelineRunner 上使用自定义 DataFlow 无界源 的相关文章

  • java.lang.NoClassDefFoundError:迁移到数据流 2.x 后的 org/apache/beam/sdk/runners/PipelineRunner

    获取运行时错误 java lang NoClassDefFoundError org apache beam sdk runners PipelineRunner 即使我的 pom xml 中有以下内容
  • 在 Google Dataflow 中使用 FireStore

    我想在带有 python 的数据流模板中使用 FireStore 我做过这样的事情 with beam Pipeline options options as p p Read from PubSub gt gt beam io ReadF
  • 数据流进入 Beam Pipeline 时的附加参数

    我正在研究 Dataflow 我已经通过 Python SDK 构建了自定义管道 我想将数据流 UI 上的参数添加到我的自定义管道中 使用附加参数 参考者https cloud google com dataflow docs guides
  • 使用 Python 的 Dataflow/Beam 示例

    我正在尝试获取以下项目的样本PCollection在 Dataflow Beam 上使用 Python SDK 虽然没有记录 Sample FixedSizeGlobally n 存在 测试时 它seems返回一个PCollection包含
  • 在 DirectPipelineRunner 上使用自定义 DataFlow 无界源

    我正在编写一个从 Kafka 0 8 读取的自定义 DataFlow 无界数据源 我想使用 DirectPipelineRunner 在本地运行它 但是 我得到以下堆栈跟踪 Exception in thread main java lan
  • NameError:名称“pvalue”未定义

    在此处的文档中 https beam apache org documentation programming guide additional outputs https beam apache org documentation pro
  • google cloud dataflow (apache beam)可以使用ffmpeg来处理视频或图像数据吗

    数据流进程可以使用 ffmpeg 处理视频或图像吗 如果可以 示例工作流程会是什么样子 是的 您可以使用以下命令确保工作人员可以使用必要的二进制文件 及其依赖项 filesToStage 管道选项 https cloud google co
  • 数据流/apache beam 窗口中字节数的触发窗口

    我有一个简单的工作 将数据从 pub sub 移动到 gcs pub sub 主题是一个共享主题 具有许多不同大小的不同消息类型 我希望结果在 GCS 中相应地垂直分区 架构 版本 年 月 日 该父键下应该是当天的一组文件 并且文件的大小应
  • 数据流中的值错误:GCS 位置无效:无

    我正在尝试从 GCS 存储桶加载数据并将内容发布到 pubsub 和 bigquery 这些是我的管道选项 options PipelineOptions project project temp location gs dataflow
  • Dataflow 作业完成时通知 Google PubSub

    有没有办法在 Google Dataflow 作业完成后将消息发布到 Google Pubsub 上 我们需要通知依赖系统传入数据的处理已完成 将数据写入到接收器后 Dataflow 如何发布 EDIT 我们希望在管道完成写入 GCS 后发
  • 使用 Google Cloud DataFlow python sdk 读取一组 xml 文件

    我正在尝试从 GCS 存储桶读取 XML 文件的集合并处理它们 其中集合中的每个元素都是代表整个文件的字符串 但我找不到关于如何完成此操作的合适示例 我也无法理解它来自 Apache Beam 文档 主要是关于 Java 版本的 我当前的管
  • 可以使用数据流将 pubsub 消息重复数据删除回 pubsub 吗?

    我有一个将数据写入 Google Cloud pubsub 的应用程序 根据 pubsub 的文档 由于重试机制而导致的重复偶尔可能会发生 还有消息乱序的问题 这在 pubsub 中也得不到保证 另外 根据文档 可以使用 Google Cl
  • Cloud SQL 增量到 BigQuery

    我需要针对我正在研究的用例之一提供一些建议 使用案例 我们在 Cloud SQL 中拥有大约 5 10 个表的数据 其中一些被视为查找表 另一些则被视为事务性表 我们需要将其发送到 BigQuery 以生成 3 4 个表 扁平化 嵌套或非规
  • 旁加载静态数据

    在 ParDo 中处理数据时 我需要使用存储在 Google Cloud Storage 上的 JSON 架构 我想这可能是侧面加载 我读了他们称之为文档的页面 https beam apache org releases pydoc 2
  • 写入 BigQuery 时处理卡住

    我正在使用云数据流将数据从 Pub Sub 消息导入到 BigQuery 表 我正在使用 DynamicDestinations 因为这些消息可以放入不同的表中 我最近注意到该进程开始消耗所有资源 并且消息表明该进程被卡住开始显示 Proc
  • Apache Beam:跳过已构建的管道中的步骤

    有没有办法有条件地跳过已构建的管道中的步骤 或者管道构建是否被设计为控制运行哪些步骤的唯一方法 通常 管道构造控制将执行管道中的哪些转换 但是 您可以想象一个输入 多个输出ParDo复用输入PCollection到输出之一PCollecti
  • Apache Beam:具有无限源的批处理管道

    我目前正在使用 Apache Beam 和 Google Dataflow 来处理实时数据 数据来自Google PubSub 它是无限制的 所以目前我正在使用流媒体管道 然而 事实证明 拥有一个 24 7 运行的流管道是相当昂贵的 为了降
  • Apache Beam Pipeline 写表后查询表

    我有一个 Apache Beam Dataflow 管道 它将结果写入 BigQuery 表 然后我想查询该表以获取管道的单独部分 但是 我似乎无法弄清楚如何正确设置此管道依赖性 我编写的新表 然后想要查询 与一个单独的表连接以进行某些过滤
  • Dataflow sideInput 可以通过读取 gcs 存储桶来更新每个窗口吗?

    我目前正在创建一个 PCollectionView 方法是从 gcs 存储桶中读取过滤信息 并将其作为侧面输入传递到管道的不同阶段 以过滤输出 如果 gcs 存储桶中的文件发生更改 我希望当前正在运行的管道使用这个新的过滤器信息 如果我的过
  • 如何删除 gcloud Dataflow 作业?

    数据流作业在我的仪表板上杂乱无章 我想从我的项目中删除失败的作业 但在仪表板中 我没有看到任何删除数据流作业的选项 我正在寻找至少像下面这样的东西 gcloud beta dataflow jobs delete JOB ID 要删除所有作

随机推荐

  • scenebulider 无法打开 fxml 文件

    我正在 JavaFX 中创建一个应用程序 我在打开场景构建器时收到此错误 无法打开 Loggin fxml 打开操作失败 请确保所选文件是有效的 fxml 文档 单击 显示详细信息 它向我显示了此错误 java io IOException
  • 如何将二进制值字符串转换回 char

    Example 注意 我只关心字母 所以位集 000001 是a or A 我有一个string named s与价值 abc 我把每一个char of the string并将其转换为二进制值通过 指某东西的用途bitset e g bi
  • (自定义)RestAuthenticationProcessingFilter 排序的异常

    我尝试将令牌的 Rest 身份验证添加到我的应用程序中 我创建了一个简单的过滤器 不执行任何其他操作来打印消息 public class RestAuthenticationProcessingFilter extends GenericF
  • Justadistraction:标记化没有空格的英语。村上羊人

    我想知道如何you如果删除空格 会用英语 或其他西方语言 对字符串进行标记吗 这个问题的灵感来源于村上小说 羊人 中的角色舞蹈 舞蹈 舞蹈 http en wikipedia org wiki Dance Dance Dance 在小说中
  • 如何将本地数据库复制到heroku?

    我正在开发一个简单的 Rails 基于 activeRecord 应用程序 并且正在本地进行测试 现在是时候转移到网上了 但是 我需要在应用程序的数据库中再次插入所有记录吗 我希望不是 你知道是否可以复制我的整个本地数据库并将其导入到her
  • 透明 ViewController 可以看到下面的父级吗?

    我想以模态方式添加一个具有透明背景的视图控制器 以便可以看到下面的父视图控制器 这是适用于 iPhone 的应用程序 不适用于 iPad 我已经尝试过这个 TextFieldViewController vc self storyboard
  • 计算布尔数组中真(或假)元素的数量?

    假设我有一个充满布尔值的数组 我想知道有多少元素为 true private bool testArray new bool 10 true false true true false true true true false false
  • Firebase .get() 与 .once() - 有什么区别?

    文档here https firebase google com docs database web read and write read data once with get但我不明白 get 和 once 有什么区别 我的理解是 ge
  • iPhone - 将选定的单元格移动到 uitableview 的顶部

    我寻找这个问题 但我不相信我能找到答案 我有一个自定义单元格的表格视图 单击该单元格时 所选单元格会推送包含信息的新单元格 我想知道是否有人知道如何将选定的单元格推到 uitableview 的顶部 或者让它填满整个表格视图 我的 uita
  • 将嵌入的视频资源作为流播放

    EDIT 我改变了我的问题以更好地澄清问题 如何使用字节数组 取自嵌入式资源 播放视频DirectShow Net http directshownet sourceforge net about html图书馆 由于我要阻止用户访问视频文
  • CSS Transform Math - 计算倾斜引起的div高度

    我很难弄清楚如何计算因倾斜而导致的 div 容器的额外高度 我正在屏蔽容器内的图像并使用plugin http christianvarga com 2011 05 jquery resize image to parent contain
  • 使用标识符列表格式化 CREATE TABLE 查询

    我想用 Python 编写一个脚本 从 CSV 创建 PostgreSQL 表 而不是使用psycopg2 copy from我想要更个性化 更灵活的东西 显然 我将读取 CSV 文件的第一行并从中获取列名称列表 然后我想将此列表转换为您在
  • apache tomcat 9.x 无法与 eclipse 和 Java 10.0.1 一起使用

    我已经安装了 apache tomcat 9 0 7在我的 Windows 机器上并具有以下环境配置 回显 JAVA HOME C 程序文件 Java jdk 10 0 1 回显 JRE HOME C 程序文件 Java jre 10 0
  • 我可以让节点在数字字符串中输出逗号而不引入 i18n 吗?

    并不是说添加一个需求有什么大不了的 而是节点文档建议您不需要它 http nodemanual org latest js doc Number html Number toLocaleString from the docs var nu
  • Docker 在 Windows 上添加网络驱动器作为卷

    我正在尝试将网络驱动器安装为卷 这是我正在尝试的命令 docker run v NetworkDirectory Folder data alpine ls data 我在 Windows 上运行此命令 数据目录为空 如何将此网络目录作为卷
  • 关于 MVVM 模式和 GUI 中 XAML 动态加载的混淆

    嗯 这个问题与 MVVM 模式相关 我可以在这个论坛上快速得到答案 所以我想询问并消除我对该模式的困惑 我对 MVVM 方法还很陌生 我欣赏这种模式并了解其背后的原理 也许我没有对这个模式进行太多的研究 这就是为什么会有一些困惑 如果有一种
  • 检测java中的回车键

    我尝试使用扫描仪获取用户输入 如果用户按 Enter 键 则继续执行下一个输入语句 但它一次打印全部 public class MainRDS public static void main String args Scanner in n
  • Spring Boot oauth2:如何设置授权请求中的资源参数以使adfs满意?

    我正在尝试设置一个 Spring Boot 应用程序 该应用程序使用 oauth2 和 Active Directory 联合身份验证服务作为身份验证提供程序 我从这里的教程开始 https spring io guides tutoria
  • 以强类型方式获取属性的 [DisplayName] 属性

    再会 我有这样的方法来获得 DisplayName 属性的属性值 直接附加或使用 MetadataType 属性 我在极少数情况下需要使用它 DisplayName 在控制器代码中 public static class MetaDataH
  • 在 DirectPipelineRunner 上使用自定义 DataFlow 无界源

    我正在编写一个从 Kafka 0 8 读取的自定义 DataFlow 无界数据源 我想使用 DirectPipelineRunner 在本地运行它 但是 我得到以下堆栈跟踪 Exception in thread main java lan