Flink CEP:对于不同类型的事件,使用哪种方法加入数据流?

2024-03-14

假设我有两种不同类型的数据流,一种提供天气数据,另一种提供车辆数据,我想使用 Flink 对数据进行复杂的事件处理。

Flink 1.3.x 中哪种方法是正确的使用方法?我看到了不同的方法,如 Union、Connect、Window Join。基本上我只是想尝试一个简单的 CEP,如下所示:

IF weather is wet AND vehicle speed > 60 
WITHIN the last 10 seconds
THEN raise alert

Thanks!


在我看来,有两种方法可以解决这个问题:

  1. 对不同类型的事件使用公共父类型,并通过以下方式连接两个流union使用CEP库之前的方法。

  2. 您可以使用flink-siddhi使用 SiddhiCEP 处理流的包提供了同时描述多个数据流的模式(通过 SiddhiQL)的方法。有关更多信息flink-siddhi here: https://haoch.github.io/flink-siddhi/ https://haoch.github.io/flink-siddhi/。源代码可以在 GitHub 上找到:https://github.com/haoch/flink-siddhi https://github.com/haoch/flink-siddhi。 SiddhiCEP 和 SiddhiQL 的文档:https://docs.wso2.com/display/CEP420/SiddhiQL+Guide+3.1 https://docs.wso2.com/display/CEP420/SiddhiQL+Guide+3.1.

希望这些信息会有帮助。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink CEP:对于不同类型的事件,使用哪种方法加入数据流? 的相关文章

  • Apache Flink RollingFileAppender

    我正在使用 Apache Flink v1 2 我想切换到滚动文件附加程序 以避免包含几天数据的巨大日志文件 然而它似乎不起作用 我调整了 log4j 配置 log4j properties 如下 log4j appender file o
  • 基于 ProcessWindowFunction 的 Flink 单元测试

    如何为有状态流程函数创建单元测试 我有这样的事情 private static SingleOutputStreamOperator
  • 在 kubernetes 上持续部署有状态 apache flink 应用程序

    我想在 kubernetes 上运行 apache flink 1 11 1 流应用程序 文件系统状态后端保存到 s3 s3 的检查点正在运行 args standalone job s s3 BUCKET NAME 34619f2862c
  • 我可以将自定义分区器与 group by 一起使用吗?

    假设我知道我的数据集不平衡并且我知道键的分布 我想利用它来编写一个自定义分区器 以充分利用运算符实例 我知道关于数据流 partitionCustom https ci apache org projects flink flink doc
  • 如何构建和使用flink-connector-kinesis?

    我正在尝试将 Apache Flink 与 AWS kinesis 结合使用 这document https ci apache org projects flink flink docs release 1 7 dev connector
  • Apache Flink:设置并行度的指南?

    我正在尝试获取一些简单的规则或指南来设置哪些值 操作员或工作 并行性 在我看来 它应该是一个数字 例如 假设我有 2 台任务管理器机器 每台都有 4 个任务槽 假设集群上没有运行其他作业 我会设置并行度吗 用于操作 喜欢过滤并映射到 8 如
  • Apache Flink 1.3 中的 Elasticsearch 5 连接器

    通过阅读文档 我了解到使用 Apache Flink 1 3 我应该能够使用 Elasticsearch 5 x 但是 在我的 pom xml 中
  • Flink 模式演化不适用于 POJO 类

    我有一个类满足被视为 POJO 的要求 这是我的流媒体工作中的主要传输类 它只包含原语和Map
  • 如何在 Flink 中引用外部 Jar

    每个人 我尝试在所有任务管理器中以将其复制到 FLINK lib 的方式在 Flink 中引用我的公司 jar 但失败了 而且我不想打包一个胖罐子 太重而且浪费时间 我认为第一种方法也不是一个好主意 因为我必须管理整个集群中的jar 有谁知
  • 如何在其他流的基础上过滤Apache flink流?

    我有两个流 一个是 Int 另一个是 json 在 json Schema 中 有一个键是一些 int 所以我需要通过与另一个整数流的键比较来过滤 json 流 那么在 Flink 中是否可能 是的 您可以使用 Flink 进行这种流处理
  • StreamingFileSink 未将数据提取到 s3

    我创建了简单的摄取服务 该服务选择本地文件并使用 StreamingFileSink 摄取到 s3 https ci apache org projects flink flink docs stable dev connectors st
  • 在任务管理器之间均匀分配 Flink 运算符

    我正在 15 台机器的裸机集群上构建 Flink 流应用程序原型 我使用带有 90 个任务槽 15x6 的纱线模式 该应用程序从单个 Kafka 主题读取数据 Kafka主题有15个分区 所以我也将源算子的并行度设置为15 但是 我发现 F
  • Flink:处理数据早于应用程序水印的键控流

    我正在使用带有运动源和事件时间键控窗口的 F link 该应用程序将监听实时数据流 窗口 事件时间窗口 并处理每个键控流 我有另一个用例 我还需要能够支持某些关键流的旧数据的回填 这些将是事件时间 鉴于我正在使用水印 这会成为一个问题 因为
  • 基于流的应用程序中的受控/手动错误/恢复处理

    我正在开发一个基于的应用程序Apache Flink 它利用Apache Kafka用于输入和输出 该应用程序可能会被移植到Apache Spark 所以我也将其添加为标签 问题仍然相同 我要求通过 kafka 接收的所有传入消息必须按顺序
  • Apache Flink 上的 zipWithIndex

    我想为我的输入的每一行分配一个id 这应该是一个数字0 to N 1 where N是输入中的行数 粗略地说 我希望能够执行以下操作 val data sc textFile textFilePath numPartitions val r
  • Flink 使用 Ceph 作为持久存储

    Flink 文档建议 Ceph 可以用作状态的持久存储 https ci apache org projects flink flink docs release 1 3 dev stream checkpointing html http
  • flink - 使用匕首注入 - 不可序列化?

    我使用 Flink 最新通过 git 从 kafka 流式传输到 cassandra 为了简化单元测试 我通过 Dagger 添加依赖注入 ObjectGraph 似乎已正确设置自身 但 内部对象 被 Flink 标记为 不可序列化 如果我
  • 我可以将 flink RocksDB 状态后端与本地文件系统一起使用吗?

    我正在探索使用 FlinkrocksDb 状态后端 文档似乎暗示我可以使用常规文件系统 例如 file data flink checkpoints 但代码 javadoc 仅在此处提到 hdfs 或 s3 选项 我想知道是否可以将本地文件
  • Flink中为什么DataStream不支持聚合

    我是 Flink 的新手 有时 我想在 DataStream 上进行聚合 而不需要先执行 keyBy 为什么 Flink 不支持 DataStream 上的聚合 sum min max 等 谢谢你 艾哈迈德 Flink 支持非 keyed
  • 2022年Flink可以支持什么Java版本?

    假设我开始一个新的 Flink Java 项目 如果我寻找 稳定的 Flink Java 生产体验 我应该使用哪个版本 官方docs https nightlies apache org flink flink docs master do

随机推荐