一个数据流作业内的并行管道

2023-12-23

我想在 GCP 上的一个数据流作业中运行两个并行管道。我已经创建了一个管道,它工作得很好,但我想要另一个管道而不创建另一份工作。

我已经搜索了很多答案,但找不到任何代码示例:(

如果我这样运行它就不起作用:

pipe1.run();
pipe2.run();

它给我“已经有一个活动的作业名称...如果您想提交第二个作业,请再次尝试使用设置不同的名称--jobName"


您可以将其他输入应用到管道,这将在一项作业中产生一个单独的管道。例如。:

public class ExamplePipeline {

public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
    options.setRunner(DirectRunner.class);

    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> linesForPipelineOne = pipeline.apply(Create.of("A1", "B1"));
    PCollection<String> linesToWriteFromPipelineOne = linesForPipelineOne.apply("Pipeline 1 transform",
            ParDo.of(new DoFn<String, String>() {

        @ProcessElement
        public void processElement(ProcessContext c) {
            System.out.println("Pipeline one:" + c.element());
            c.output(c.element() + " extra message.");
        }

    }));
    linesToWriteFromPipelineOne.apply((TextIO.write().to("file.txt")));

    PCollection<String> linesForPipelineTwo = pipeline.apply(Create.of("A2", "B2"));
    linesForPipelineTwo.apply("Pipeline 2 transoform",
            ParDo.of(new DoFn<String, String>() {

        @ProcessElement
        public void processElement(ProcessContext c) {
            System.out.println("Pipeline two:" + c.element());
        }

    }));

    pipeline.run();
}

正如您所看到的,您也可以将两个(或更多)单独的 PBegin 应用于具有多个 PDone/Sink 的管道。在这个例子中"pipeline 1"转储并将输出写入文件"pipeline 2"仅将其转储到屏幕上。

如果你运行这个DataflowRunner在 GCP 上,GUI 将向您显示 2 个未连接的“管道”。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

一个数据流作业内的并行管道 的相关文章

随机推荐

  • 位置无关可执行文件的正确 Xcode 设置是什么

    最近刚刚开始收到一封应用商店提交后的电子邮件 其中包含以下建议 请确保您的构建设置已配置为创建 PIE 可执行文件 然而 XCode 中的设置看起来是正确的 在链接部分我发现 不创建位置无关的可执行文件 设置为 否 双重否定YUK 您收到此
  • Android排序数组

    我如何按日期或名称对该数组进行排序 String datetable new String 21 2 datetable 0 0 2011 01 01 datetable 0 1 Name1 datetable 1 0 2011 01 03
  • 为什么宽度/高度不适用于非定位伪元素?

    我想设置一个width of before伪元素达到80 如果我使用定位 那么一切都会正常 但如果我不使用它 那么一切都会失败 你能解释一下为什么百分比宽度在没有定位的情况下不起作用吗 如果可以的话 请添加一些对规范的引用 position
  • jQuery 方法链接是流畅编程的一个例子吗?

    我对 JavaScript jQuery 有点陌生 但是当我看到方法链接的示例时 我立即感到熟悉 其他接口 如 LINQ 执行类似的操作 其中一组方法的返回类型与它们所操作的类型相同 TweetSharp 执行的操作非常类似 这是流畅编程的
  • 从 CSV 文件批量插入 - 跳过重复项

    更新 最终使用了 Johnny Bubriski 创建的这个方法 然后对其进行了一些修改以跳过重复项 效果就像一个魅力 而且速度显然相当快 关联 http johnnycode com 2013 08 19 using c sharp sq
  • 向 Django FlatPages 添加功能,无需更改原始 Django 应用程序

    我想向 Django FlatPage 数据库模型添加一个字段 但我真的不知道如何在不编辑原始应用程序的情况下扩展它 我想要做的是将以下字段添加到模型中 from django db import models from django co
  • 在 nginx 上找不到 Laravel 路由

    当我尝试访问我的测试应用程序时 只有索引路由有效 malte italoborg es http malte italoborg es 如果我尝试访问另一条路线 例如 malte italoborg es admin http malte
  • 我可以使用 jQuery 检查是否至少有一个复选框被选中吗?

    我有以下 HTML 表单 其中可以有许多复选框 单击提交按钮时 我希望用户收到一个 javascript 警报 以检查至少一个复选框 如果未选中 有没有一种简单的方法可以使用 jQuery 来做到这一点
  • Android:API 级别低于 19 的远程 Webview 调试?

    据我所知 远程调试通过chrome inspect已在 API 级别 19 中添加用于 Web 视图 不过 我正在开发一个支持 17 设备的应用程序 只是在 API 19 上 我得到了02 28 00 31 16 569 12332 123
  • 在 R 中将 LASSO 与分类变量结合使用

    我有一个包含 1000 个观察值和 76 个变量的数据集 其中大约 20 个是分类变量 我想对整个数据集使用 LASSO 我知道通过 lars 或 glmnet 在 LASSO 中使用因子变量并不能真正起作用 但是变量太多 并且它们可以采用
  • 半六角形,只有一个元件

    我试图复制以下形状但没有成功 我想我需要一些 before and after伪元素以及以下 css pentagon position relative width 78px height 50px background 3a93d0 使
  • 当我在 Haskell 中编写“show”和“read”时发生了什么?

    以下是 GHCi 的简短文字记录 Prelude gt t read read Read a gt String gt a Prelude gt t show show Show a gt a gt String Prelude gt t
  • 使用计时器显示文本 3 秒?

    是否可以使用计时器在标签中显示文本 3 秒左右 F E 当您保存某些内容并且成功时 您会收到一条短信 成功 3秒后返回原页面 有人知道如何使用标签或消息框来做到这一点吗 是的 有可能 您可以在将标签文本设置为 成功 的位置启动计时器 并将其
  • elasticsearch 5.5突出显示字段不起作用

    我测试了弹性搜索突出显示字段功能 它工作正常 我用了弹性搜索2 4 4 and spring data elasticsearch 2 0 0 RELEASE 示例代码在下面的帖子中 如何使用 Spring data elasticsear
  • 如果该集合不可在进程之间整除,则使用 MPI_Scatter

    我有一个使用 MPI Scatter 和 MPI Gather 的程序 该程序将整数 N 作为输入 并返回从 2 到 N 的质数 我创建一个包含从 2 到 N 的数字的数组 并使用 MPI Scatter 将数组拆分为 N procs 数量
  • GCC 转储预处理器定义

    gcc g 有没有办法从命令行转储其默认预处理器定义 我的意思是像 GNUC STDC 等等 是的 使用 E dM选项而不是 c 示例 将它们输出到标准输出 echo gcc dM E echo clang dM E For C echo
  • Microsoft 整数文字扩展 - 在哪里记录?

    我在 Windows 安装的标准 stdint h 头文件中遇到了一些整数文字 文字具有以下形式的后缀 i8 ui8 i16 ui16 i32 ui32 i64 ui64 我以前遇到过 i64 形式的后缀 但从未遇到过任何其他形式的后缀 我
  • 尝试改进 Encode::decode 警告消息:$SIG{__WARN__} 处理程序中的段错误

    我正在尝试改进发出的警告消息Encode decode https metacpan org pod Encode FB WARN 我希望它打印正在读取的文件的名称以及在该文件中找到格式错误的数据的行号 而不是打印模块的名称和模块中的行号
  • Oracle STANDARD_HASH 在 PLSQL 中不可用?

    我正在尝试在 PL SQL 中使用 STANDARD HASH Oracle 12c 函数 但似乎不可用 SQL gt exec dbms output put line STANDARD HASH test BEGIN dbms outp
  • 一个数据流作业内的并行管道

    我想在 GCP 上的一个数据流作业中运行两个并行管道 我已经创建了一个管道 它工作得很好 但我想要另一个管道而不创建另一份工作 我已经搜索了很多答案 但找不到任何代码示例 如果我这样运行它就不起作用 pipe1 run pipe2 run