我可以将自定义分区器与 group by 一起使用吗?

2024-01-07

假设我知道我的数据集不平衡并且我知道键的分布。我想利用它来编写一个自定义分区器,以充分利用运算符实例。

我知道关于数据流#partitionCustom https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#partitionCustom-org.apache.flink.api.common.functions.Partitioner-org.apache.flink.api.java.functions.KeySelector-。但是,如果我的流被锁定,它仍然可以正常工作吗?我的工作看起来像这样:

KeyedDataStream afterCustomPartition = keyedStream.partitionCustom(new MyPartitioner(), MyPartitionKeySelector())

DataStreamUtils.reinterpretAsKeyedStream(afterCustomPartition, new MyGroupByKeySelector<>()).sum()

我想要实现的是:

  • 根据某个键拥有一个流 keyBy ,以便仅使用该键中的元素调用reduce函数。
  • 该组根据一些自定义分区将工作拆分到节点之间。
  • 自定义分区根据并行运算符实例的数量返回一个数字(该数字将被修复并且不会重新缩放)。
  • 自定义分区从 keyBy 返回不同的值。然而,keyBy(x) = keyBy(y) => partition(x) = partition(y).
  • Having 预聚合 https://stackoverflow.com/questions/51634189/does-flink-support-map-side-aggregations-streaming在分区之前最大限度地减少网络流量。

用例示例:

  • 数据集:[(0, A), (0, B), (0, C), (1, D), (2, E)]
  • 并行算子实例数量:2
  • 按函数分组:返回该对的第一个元素
  • 分区函数:对于键 0 返回 0,对于键 1 和 2 返回 1。优点:处理可能将键 0 和 1 发送到同一运算符实例的数据倾斜,这意味着一个运算符实例将接收 80% 的数据集。

不幸的是这是不可能的。DataStreamUtils.reinterpretAsKeyedStream()要求数据进行相同的分区,就像您调用keyBy().

造成此限制的原因是密钥组以及密钥如何映射到密钥组。密钥组是 Flink 分配密钥状态的单位。键组的数量决定了算子的最大并行度,配置为setMaxParallelism()。密钥通过内部哈希函数分配给密钥组。通过更改密钥的分区,同一密钥组的密钥将分布在多台机器上,这是行不通的。

为了调整机器的密钥分配,您需要更改密钥组的密钥分配。但是,没有公共或可访问的接口来执行此操作。因此,Flink 1.6 中不支持自定义密钥分配。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

我可以将自定义分区器与 group by 一起使用吗? 的相关文章

  • 根据 Flink 的模式使用 GCS 文件

    由于 Flink 支持 Hadoop 文件系统抽象 并且有一个GCS连接器 https github com GoogleCloudPlatform bigdata interop 在 Google Cloud Storage 之上实现它的
  • 如何判断 Apache Flink 运行在哪个端口?

    我安装了 apache flink 转到flink 1 14 3文件夹并运行 bin start cluster sh 它似乎已成功启动集群 因为它输出了以下内容 Starting cluster Starting standalonese
  • Apache Flink:如何从 Cassandra 读取数据流/数据集?

    我尝试将 Cassandra 视为 Flink 中的数据源 并使用以下链接中提供的信息 从 Cassandra 读取数据以在 Flink 中进行处理 https stackoverflow com questions 43067681 re
  • Flink时间特性和AutoWatermarkInterval

    在 Apache Flink 中 setAutoWatermarkInterval interval 向下游操作员生成水印 以便他们提前事件时间 如果水印在指定的时间间隔内没有更改 没有事件到达 运行时将不会发出任何水印 另一方面 如果在下
  • Flink 中的水印和触发器有什么区别?

    我读到 排序运算符必须缓冲它接收到的所有元素 然后 当它接收到水印时 它可以对时间戳低于水印的所有元素进行排序 并按排序顺序发出它们 这是正确 因为水印表明不能有更多元素到达并与已排序元素混合 https cwiki apache org
  • Flink:处理数据早于应用程序水印的键控流

    我正在使用带有运动源和事件时间键控窗口的 F link 该应用程序将监听实时数据流 窗口 事件时间窗口 并处理每个键控流 我有另一个用例 我还需要能够支持某些关键流的旧数据的回填 这些将是事件时间 鉴于我正在使用水印 这会成为一个问题 因为
  • flink kafka生产者在检查点恢复时以一次模式发送重复消息

    我正在写一个案例来测试 flink 两步提交 下面是概述 sink kafka曾经是kafka生产者 sink stepmysql接收器是否扩展two step commit sink comparemysql接收器是否扩展two step
  • Python + Beam + Flink

    我一直在尝试让 Apache Beam 可移植性框架与 Python 和 Apache Flink 一起使用 但我似乎找不到一套完整的指令来让环境正常工作 是否有任何参考资料包含使简单的 python 管道正常工作的先决条件和步骤的完整列表
  • Flink TaskManager 超时?

    我正在运行 Flink 应用程序 通过 Yarn 似乎有时任务管理器会随机超时 这是错误 java util concurrent TimeoutException Heartbeat of TaskManager with id some
  • Apache Flink、JDBC 和 fat jar 是否存在类加载问题?

    使用 Apache Flink 1 8 并尝试运行RichAsyncFunction 我得到No Suitable Driver Found初始化 Hikari 池时出错RichAsyncFunction open 在 IDE 中它运行得很
  • Flink 的简单 hello world 示例

    我正在寻找 Apache flink 的 hello world 体验的最简单的示例 假设我刚刚在一个干净的盒子上安装了 flink 那么为了 让它做某事 我需要做的最低限度是什么 我意识到这很模糊 这里有一些例子 来自终端的三个 pyth
  • 基于流的应用程序中的受控/手动错误/恢复处理

    我正在开发一个基于的应用程序Apache Flink 它利用Apache Kafka用于输入和输出 该应用程序可能会被移植到Apache Spark 所以我也将其添加为标签 问题仍然相同 我要求通过 kafka 接收的所有传入消息必须按顺序
  • Apache Flink - 如何使用 AWS Kinesis 发送和使用 POJO

    我想使用 Flink 来使用来自 Kinesis 的 POJO 是否有关于如何正确发送和反序列化消息的标准 Thanks 我用以下方法解决了它 DataStream
  • 尝试升级到 flink 1.3.1 时出现异常

    我尝试将集群中的 flink 版本升级到 1 3 1 以及 1 3 2 但我的任务管理器中出现以下异常 2018 02 28 12 57 27 120 ERROR org apache flink streaming runtime tas
  • Apache Flink - “keyBy”中的异常处理

    由于代码错误或缺乏验证 进入 Flink 作业的数据可能会触发异常 我的目标是提供一致的异常处理方式 我们的团队可以在 Flink 作业中使用这种方式 而不会导致生产中出现任何停机 重启策略似乎不适用于此处 因为 简单的重启无法解决问题 我
  • Flink中为什么DataStream不支持聚合

    我是 Flink 的新手 有时 我想在 DataStream 上进行聚合 而不需要先执行 keyBy 为什么 Flink 不支持 DataStream 上的聚合 sum min max 等 谢谢你 艾哈迈德 Flink 支持非 keyed
  • 当我重新运行 Flink 消费者时,Kafka 再次消费最新消息

    我在用 Scala 编写的 Apache Flink API 中创建了一个 Kafka 消费者 每当我从某个主题传递一些消息时 它就会及时接收它们 但是 当我重新启动使用者时 它不会接收新的或未使用的消息 而是使用发送到该主题的最新消息 这
  • 将 Apache Flink 与 Lagom 结合使用时出现 java.io.NotSerializedException

    我正在 Lagom 的微服务实现中编写 Flink CEP 程序 我的 FLINK CEP 程序在简单的 scala 应用程序中运行得非常好 但是当我在 Lagom 服务实现中使用此代码时 我收到以下异常 拉戈姆服务实施 override
  • Flink从hdfs读取数据

    我是 Flink 的新生 我想知道如何从 hdfs 读取数据 有人可以给我一些建议或一些简单的例子吗 谢谢你们 如果您的文件采用文本文件格式 则可以使用 ExecutionEnvironment 对象中的 readTextFile 方法 这
  • Flink 在 Kubernetes 上的部署和 Native Kubernetes 有什么不同

    黑白的主要区别是什么原生 Kubernetes https ci apache org projects flink flink docs stable ops deployment native kubernetes html and 库

随机推荐