如何在其他流的基础上过滤Apache flink流?

2024-02-20

我有两个流,一个是 Int ,另一个是 json 。在 json Schema 中,有一个键是一些 int 。所以我需要通过与另一个整数流的键比较来过滤 json 流,那么在 Flink 中是否可能?


是的,您可以使用 Flink 进行这种流处理。 Flink 所需的基本构建块是连接的流和有状态函数——以下是使用 RichCoFlatMap 的示例:

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;

public class Connect {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Event> control = env.fromElements(
                new Event(17),
                new Event(42))
                .keyBy("key");

        DataStream<Event> data = env.fromElements(
                new Event(2),
                new Event(42),
                new Event(6),
                new Event(17),
                new Event(8),
                new Event(42)
                )
                .keyBy("key");

        DataStream<Event> result = control
                .connect(data)
                .flatMap(new MyConnectedStreams());

        result.print();

        env.execute();
    }

    static final class MyConnectedStreams
            extends RichCoFlatMapFunction<Event, Event, Event> {

        private ValueState<Boolean> seen = null;

        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>(
                    // state name
                    "have-seen-key",
                    // type information of state
                    TypeInformation.of(new TypeHint<Boolean>() {
                    }));
            seen = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void flatMap1(Event control, Collector<Event> out) throws Exception {
            seen.update(Boolean.TRUE);
        }

        @Override
        public void flatMap2(Event data, Collector<Event> out) throws Exception {
            if (seen.value() == Boolean.TRUE) {
                out.collect(data);
            }
        }
    }


    public static final class Event {
        public Event() {
        }

        public Event(int key) {
            this.key = key;
        }

        public int key;

        public String toString() {
            return String.valueOf(key);
        }
    }
}

在此示例中,只有那些在控制流上看到的键才会通过数据流传递——所有其他事件都被过滤掉。我已经利用了Flink 的托管键控状态 https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html and 连接的流 https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/scala/ConnectedStreams.html.

为了简单起见,我忽略了您对数据流具有 JSON 的要求,但您可以在其他地方找到如何使用 JSON 和 Flink 的示例。

请注意,您的结果将是不确定的,因为您无法控制两个流相对于彼此的时间。您可以通过向流添加事件时间时间戳,然后使用 RichCoProcessFunction 来管理此问题。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在其他流的基础上过滤Apache flink流? 的相关文章

  • SingleOutputStreamOperator#returns(TypeHint typeHint) 方法的 javadoc

    我正在阅读源代码SingleOutputStreamOperator returns 它的javadoc是 Adds a type information hint about the return type of this operato
  • Kafka - 无法建立与节点-1的连接

    我正在尝试使用 apache flink 流处理 kafka 主题 但我遇到了这个问题 2018 04 10 02 55 59 856 ProducerConfig values acks 1 batch size 16384 bootst
  • 使用 Flink LocalEnvironment 进行生产

    我想了解本地执行环境的局限性以及它是否可以用于在生产中运行 感谢任何帮助 见解 谢谢 LocalExecutionEnvironment 启动一个 Flink MiniCluster 它在单个 JVM 中运行整个 Flink 系统 JobM
  • 如何在其他流的基础上过滤Apache flink流?

    我有两个流 一个是 Int 另一个是 json 在 json Schema 中 有一个键是一些 int 所以我需要通过与另一个整数流的键比较来过滤 json 流 那么在 Flink 中是否可能 是的 您可以使用 Flink 进行这种流处理
  • Flink 中的水印和触发器有什么区别?

    我读到 排序运算符必须缓冲它接收到的所有元素 然后 当它接收到水印时 它可以对时间戳低于水印的所有元素进行排序 并按排序顺序发出它们 这是正确 因为水印表明不能有更多元素到达并与已排序元素混合 https cwiki apache org
  • Python + Beam + Flink

    我一直在尝试让 Apache Beam 可移植性框架与 Python 和 Apache Flink 一起使用 但我似乎找不到一套完整的指令来让环境正常工作 是否有任何参考资料包含使简单的 python 管道正常工作的先决条件和步骤的完整列表
  • 在 Flink 中,我可以在同一个槽中拥有一个算子的多个子任务吗?

    探索Apache Flink几天了 对Task Slot的概念有些疑惑 虽然有人问了几个问题 但有一点我不明白 我正在使用一个玩具应用程序进行测试 运行本地集群 我已禁用运算符链接 我从文档中知道插槽允许内存隔离而不是 CPU 隔离 阅读文
  • 基于流的应用程序中的受控/手动错误/恢复处理

    我正在开发一个基于的应用程序Apache Flink 它利用Apache Kafka用于输入和输出 该应用程序可能会被移植到Apache Spark 所以我也将其添加为标签 问题仍然相同 我要求通过 kafka 接收的所有传入消息必须按顺序
  • Flink 使用 Ceph 作为持久存储

    Flink 文档建议 Ceph 可以用作状态的持久存储 https ci apache org projects flink flink docs release 1 3 dev stream checkpointing html http
  • 对 Parquet 批量格式使用压缩

    从 Apache Flink 1 15 版本开始 您可以使用压缩功能将多个文件合并为一个 https nightlies apache org flink flink docs master docs connectors datastre
  • Cassandra Pojo Sink Flink 中的动态表名称

    我是 Apache Flink 的新手 我正在使用 Pojo Sink 将数据加载到 Cassandra 中 现在 我在以下命令的帮助下指定表和键空间名称 Table注解 现在 我想在运行时动态传递表名称和键空间名称 以便可以将数据加载到用
  • Flink中为什么DataStream不支持聚合

    我是 Flink 的新手 有时 我想在 DataStream 上进行聚合 而不需要先执行 keyBy 为什么 Flink 不支持 DataStream 上的聚合 sum min max 等 谢谢你 艾哈迈德 Flink 支持非 keyed
  • Apache Flink 动态设置 JVM_OPT env.java.opts

    是否可以设置自定义 JVM 选项env java opts提交作业时未在作业中指定conf flink conf yaml file 我问的原因是我想在 log4j 中使用一些自定义变量 我也在 YARN 上运行我的工作 我已经使用 CLI
  • 将 flink 从 1.10 升级到 1.11,遇到错误“找不到执行应用程序的 ExecutorFactory”

    java lang IllegalStateException No ExecutorFactory found to execute the application at org apache flink core execution D
  • Flink 窗口:聚合并输出到接收器

    我们有一个数据流 其中每个元素都是这种类型 id String type Type amount Integer 我们想要聚合这个流并输出总和amount每周一次 目前的解决方案 Flink 管道示例如下所示 stream keyBy ty
  • 当我重新运行 Flink 消费者时,Kafka 再次消费最新消息

    我在用 Scala 编写的 Apache Flink API 中创建了一个 Kafka 消费者 每当我从某个主题传递一些消息时 它就会及时接收它们 但是 当我重新启动使用者时 它不会接收新的或未使用的消息 而是使用发送到该主题的最新消息 这
  • Apache Flink 中的并行度

    我可以为 Flink 程序中任务的不同部分设置不同的并行度吗 例如 Flink 如何解释以下示例代码 两个自定义实践者MyPartitioner1 MyPartitioner2 将输入数据划分为两个4和2个分区 partitionedDat
  • Flink - 无法从检查点恢复

    我使用一个作业管理器和两个任务管理器在 kubernetes 上运行集群 我通过在作业运行时杀死一个任务管理器 Pod 来测试检查点机制 我在作业管理器和重新启动的任务管理器上遇到以下异常 工作经理例外 java lang Exceptio
  • 2022年Flink可以支持什么Java版本?

    假设我开始一个新的 Flink Java 项目 如果我寻找 稳定的 Flink Java 生产体验 我应该使用哪个版本 官方docs https nightlies apache org flink flink docs master do
  • Flink:Jobmanager UI 中设置的并行度与任务槽有何关系?

    假设我有 8 个任务管理器和 16 个任务槽 如果我使用 Jobmanager UI 提交作业并将并行度设置为 8 我是否只使用 8 个任务槽 如果我有 8 个具有 8 个槽位的任务管理器 并以并行度 8 提交相同的作业 该怎么办 是完全一

随机推荐