我正在编写一个从 Kafka 0.8 读取的自定义 DataFlow 无界数据源。我想使用 DirectPipelineRunner 在本地运行它。但是,我得到以下堆栈跟踪:
Exception in thread "main" java.lang.IllegalStateException: no evaluator registered for Read(KafkaDataflowSource)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:700)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:252)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:662)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:374)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:87)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:174)
这是有道理的,因为我任何时候都没有为我的自定义源注册评估程序。
Reading https://github.com/GoogleCloudPlatform/DataflowJavaSDK https://github.com/GoogleCloudPlatform/DataflowJavaSDK,似乎只有评估者bounded来源已注册。为自定义无限源定义和注册评估器的推荐方法是什么?
DirectPipelineRunner
目前仅在有界输入上运行。我们正在积极努力消除此限制,并预计很快就会发布。
同时,您可以轻松地转动任何UnboundedSource
into a BoundedSource
,出于测试目的,通过使用withMaxNumRecords
,如下例所示:
UnboundedSource<String> unboundedSource = ...; // make a Kafka source
PCollection<String> boundedKafkaCollection =
p.apply(Read.from(unboundedSource).withMaxNumRecords(10));
See 这个问题在 GitHub 上 https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/93更多细节。
另外,还有一些致力于贡献 Kafka 连接器的努力。您可能想通过以下方式与我们和其他贡献者互动我们的 GitHub 存储库 https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/96.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)