Flink 可以将结果写入多个文件(如 Hadoop 的 MultipleOutputFormat)吗?

2023-12-03

我正在使用 Apache Flink 的 DataSet API。我想实现一项将多个结果写入不同文件的作业。

我怎样才能做到这一点?


您可以将任意数量的数据接收器添加到DataSet根据您的需要进行编程。

例如在这样的程序中:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple3<String, Long, Long>> data = env.readFromCsv(...);
// apply MapFunction and emit
data.map(new YourMapper()).writeToText("/foo/bar");
// apply FilterFunction and emit
data.filter(new YourFilter()).writeToCsv("/foo/bar2");

你读了一篇DataSet data来自 CSV 文件。这data给出两个后续变换:

  1. To a MapFunction其结果被写入文本文件。
  2. To a FilterFunction未过滤的元组将写入 CSV 文件。

您还可以拥有多个数据源以及分支和合并数据集(使用union, join, coGroup, cross或广播集)随您喜欢。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink 可以将结果写入多个文件(如 Hadoop 的 MultipleOutputFormat)吗? 的相关文章

随机推荐

  • 如何将静态部分添加到javapoet中的java类中

    有没有办法使用javapoet库将静态代码块添加到java类中 static whatever code is needed for initialization goes here Use TypeSpec Builder addStat
  • 为什么没有安装tensorflow?

    我无法安装张量流 显示此错误 错误 找不到满足张量流要求的版本 来自版本 无 错误 找不到张量流的匹配分布 我安装了Python 3 11 但再次收到相同的错误消息 我用谷歌搜索了这个错误 并尝试了一些向其他人建议的方法 但没有任何效果 包
  • 使工作簿保存中特定工作表的字段成为必填字段

    我在 Excel 中使用宏来使 Excel 工作簿中的字段成为必填字段 但是 问题是工作簿包含多个工作表 并且宏适用于所有工作表 有没有办法定位工作簿中的特定工作表 下面是我正在使用的代码 Private Sub Workbook Befo
  • SwiftUI 如何设置下划线与文本之间的间距?

    设置下划线的代码 我想让文字和下划线之间的间距变大 Text underline text underline 下划线是一种字体功能 您只需在需要的地方画线即可进行自定义 var body some View HStack Text Bef
  • 为什么不能用 C 语言编写 scanf("%.2lf", &a) ?

    我的朋友刚刚开始学习编程 他向我展示了他的代码并询问为什么它返回一些奇怪的数字 我看了一下 发现他用的是scanf 2lf a 接受输入 并按照习惯 我尝试将其更改为正常 然后他问我为什么它有一些奇怪的输出 谷歌搜索后我仍然没有找到答案 谁
  • EmailProperty 与 StringProperty 有何不同?

    如何EmailProperty与 不同StringProperty 考虑这两个例子 example 1 store an e mail address in an EmailProperty class MyModel db Model e
  • 如何在android中的一个布局上显示一半按钮,在另一个布局上显示一半按钮?

    我想设计一个如下图所示的布局 我尝试使用相对布局来做到这一点 但我没有想出解决方案 对于所有设备屏幕 它应该位于相同的位置 我怎样才能实现它 我尝试了这段代码
  • 如何在画布上绘制位图,同时尊重位图的 alpha 值?

    背景 我有一个主位图 我需要在其上绘制其他位图 主位图有一些半透明像素 具有 Alpha 通道变量值的像素 因此在其上绘制的其他位图应与其合并 而不是完全覆盖颜色 问题 我如何设置画布以在主位图上绘制相对于半透明像素的位图 注意 alpha
  • 调用表值函数时添加查询提示

    我正在从实体框架调用表值函数 并且需要能够添加option recompile 因为它选择的执行计划不是最佳的 在 SQL Server Management Studio 中运行查询 它看起来像这样 select from dbo fDE
  • Python、__init__ 和自我困惑

    好吧 当我发现这个时 我正在查看一些来源 gt gt gt def parse self filename parse ID3v1 0 tags from MP3 file self clear try fsock open filenam
  • 使用 GAS 将 google 电子表格转换为 XLSX 或 ODS

    我想将一些谷歌电子表格转换为Excel 首选xlsx 我已经阅读了几个有关如何实现此目标的线程 但我无法运行它 我读过的主题包括Google Apps 脚本 将电子表格保存为 ODS 以进行本地备份 and Google Apps 脚本通过
  • 使用 Jenkins 部署到 VPN

    我的总体目标是自动部署到 VPN 中的服务器 目前的 VPN 是 Cisco AnyConnect 和 Barracuda 但如果有更通用的解决方案就更好了 我考虑过使用 Jenkins 但我发现的唯一相关资源是这个插件https wiki
  • Stroustrup:对于 C++,如何安装 FLTK 库?

    问 有人可以指导我如何安装适用于 Microsoft Visual Studio 2015 的 FLTK 以便我可以将 FLTK 用于 C 吗 额外的信息 Bjarne Stroustrup 的 编程 使用 C 的原理与实践 中的第 12
  • 将运行时参数传递给 odeint 积分器

    我想使用 odeint boost 积分器来查看克尔时空中的测地线 这需要为各种参数值运行积分器 我有初始条件和初始动量向量 因此系统的角动量将根据我想要如何启动它而变化 我一直在关注这里列出的优秀示例 http headmyshoulde
  • 在 Maven 中,模块是否受存储库的 updatePolicy 影响?

    这就像我问的另一个问题一样 但不是依赖关系 而是关于模块 让我举一个场景 你有一个多module项目和部署该项目的持续集成服务器 这将部署到您本地的存储库settings xml has an updatePolicy of always
  • 如何更改函数内数据框列表中的列名称?

    我知道 如何更改数据帧列表中的名称 的答案已被多次回答 但是 我一直试图生成一个函数 该函数可以将任何列表作为参数并更改列表中所有数据帧的所有列名称 我正在处理大量 csv 文件 所有这些文件都具有相同的 3 列名称 我按如下方式分组导入文
  • Android 上的 Firebase 未调用 setValue onComplete

    我使用安全规则来确定在哪些条件下可以将值写入数据库 目前我已经设置了安全规则 这些规则工作正常 我用模拟器检查过 所以到目前为止没有问题 如果写入权限被拒绝 不满足安全规则 我想采取一些措施 为了做到这一点 我计划在 setValue 上使
  • 使用 Angular 2 Rxjs 计算每秒按键次数

    Created by darius on 02 04 16 import Component from angular2 core import Observable from rxjs Rx Component styles requir
  • 添加自定义标记到地图 - Android

    我目前有一个使用 MapView 向用户显示谷歌地图的应用程序 我一直在尝试使用此代码在地图上放置标记 public boolean onTouchEvent MotionEvent event MapView mapView if eve
  • Flink 可以将结果写入多个文件(如 Hadoop 的 MultipleOutputFormat)吗?

    我正在使用 Apache Flink 的 DataSet API 我想实现一项将多个结果写入不同文件的作业 我怎样才能做到这一点 您可以将任意数量的数据接收器添加到DataSet根据您的需要进行编程 例如在这样的程序中 ExecutionE