Flink 仪表板版本 1.3.2 中无法执行 CEP 模式,这是由 ClassNotFoundException 引起的

2024-01-10

我写了一个像这样的简单模式

   Pattern<JoinedEvent, ?> pattern = Pattern.<JoinedEvent>begin("start")
            .where(new SimpleCondition<JoinedEvent>() {
     @Override
     public boolean filter(JoinedEvent streamEvent) throws Exception {

            return streamEvent.getRRInterval()>= 10 ;
                        }
             }).within(Time.milliseconds(WindowLength));

并且在 IntellijIdea 中执行得很好。我在仪表板和 IntelliJ-Idea 中都使用 Flink 1.3.2。当我从源代码构建 Flink 时,我看到了很多警告消息,这让我相信迭代条件类尚未包含在 jar 中,因为错误也说ClassNotFoundException。下面是错误

Caused by: java.lang.NoClassDefFoundError: org/apache/flink/cep/pattern/conditions/IterativeCondition
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
at java.lang.Class.getMethod0(Class.java:3018)
at java.lang.Class.getMethod(Class.java:1784)
at 

org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:492)
    ... 38 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.cep.pattern.conditions.IterativeCondition
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 44 more

经过我一整天的努力解决这个问题,终于得到了解决方案。问题非常基本,即 Flink CEP 不是二进制发行版的一部分,因此每当我尝试执行模式时,它都会给我一个错误。

解决方案很简单

如您所见,Flink Binary 没有 cep jar。

所以转到你的 IDE(在我的例子中是 IntelliJ)并复制所需的 jar

转到此位置并将此 jar 复制粘贴到二进制版本中的 lib 文件夹中。

亚拉,问题解决了

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink 仪表板版本 1.3.2 中无法执行 CEP 模式,这是由 ClassNotFoundException 引起的 的相关文章

  • 根据 Flink 的模式使用 GCS 文件

    由于 Flink 支持 Hadoop 文件系统抽象 并且有一个GCS连接器 https github com GoogleCloudPlatform bigdata interop 在 Google Cloud Storage 之上实现它的
  • SingleOutputStreamOperator#returns(TypeHint typeHint) 方法的 javadoc

    我正在阅读源代码SingleOutputStreamOperator returns 它的javadoc是 Adds a type information hint about the return type of this operato
  • Kafka - 无法建立与节点-1的连接

    我正在尝试使用 apache flink 流处理 kafka 主题 但我遇到了这个问题 2018 04 10 02 55 59 856 ProducerConfig values acks 1 batch size 16384 bootst
  • 为什么 Flink 在 DataStream join + Global window 上发出重复记录?

    我正在学习 试验 Flink 并且观察到 DataStream 连接的一些意外行为 并且想了解发生了什么 假设我有两个流 每个流有 10 条记录 我想将其加入到id场地 假设一个流中的每条记录在另一个流中都有一个匹配的记录 并且 ID 在每
  • Flink 模式演化不适用于 POJO 类

    我有一个类满足被视为 POJO 的要求 这是我的流媒体工作中的主要传输类 它只包含原语和Map
  • 使用 Flink LocalEnvironment 进行生产

    我想了解本地执行环境的局限性以及它是否可以用于在生产中运行 感谢任何帮助 见解 谢谢 LocalExecutionEnvironment 启动一个 Flink MiniCluster 它在单个 JVM 中运行整个 Flink 系统 JobM
  • Flink:处理数据早于应用程序水印的键控流

    我正在使用带有运动源和事件时间键控窗口的 F link 该应用程序将监听实时数据流 窗口 事件时间窗口 并处理每个键控流 我有另一个用例 我还需要能够支持某些关键流的旧数据的回填 这些将是事件时间 鉴于我正在使用水印 这会成为一个问题 因为
  • Apache Beam 计数器/指标在 Flink WebUI 中不可用

    我正在使用 Flink 1 4 1 和 Beam 2 3 0 并且想知道是否可以在 Flink WebUI 或任何地方 中提供可用的指标 如 Dataflow WebUI 中那样 我用过类似的计数器 import org apache be
  • Flink TaskManager 超时?

    我正在运行 Flink 应用程序 通过 Yarn 似乎有时任务管理器会随机超时 这是错误 java util concurrent TimeoutException Heartbeat of TaskManager with id some
  • Apache Flink、JDBC 和 fat jar 是否存在类加载问题?

    使用 Apache Flink 1 8 并尝试运行RichAsyncFunction 我得到No Suitable Driver Found初始化 Hikari 池时出错RichAsyncFunction open 在 IDE 中它运行得很
  • Apache Flink - 作业内部无法识别自定义 java 选项

    我已将以下行添加到 flink conf yaml 中 env java opts Ddy props path PATH TO PROPS FILE 启动 jobmanager jobmanager sh start cluster 时
  • 基于流的应用程序中的受控/手动错误/恢复处理

    我正在开发一个基于的应用程序Apache Flink 它利用Apache Kafka用于输入和输出 该应用程序可能会被移植到Apache Spark 所以我也将其添加为标签 问题仍然相同 我要求通过 kafka 接收的所有传入消息必须按顺序
  • Apache Flink 上的 zipWithIndex

    我想为我的输入的每一行分配一个id 这应该是一个数字0 to N 1 where N是输入中的行数 粗略地说 我希望能够执行以下操作 val data sc textFile textFilePath numPartitions val r
  • 尝试升级到 flink 1.3.1 时出现异常

    我尝试将集群中的 flink 版本升级到 1 3 1 以及 1 3 2 但我的任务管理器中出现以下异常 2018 02 28 12 57 27 120 ERROR org apache flink streaming runtime tas
  • 创建具有通用返回类型的 FlinkSQL UDF

    我想定义函数MAX BY接受类型值T和类型的订购参数Number并根据排序从窗口返回最大元素 类型为T 我试过了 public class MaxBy
  • 如何正确处理自定义MapFunction中的错误?

    我已经实施了MapFunction对于我的 Apache Flink 流程 它正在解析传入元素并将其转换为其他格式 但有时会出现错误 即传入数据无效 我看到两种可能的处理方法 忽略无效元素 但似乎我无法忽略错误 因为对于任何传入元素 我必须
  • 对 Parquet 批量格式使用压缩

    从 Apache Flink 1 15 版本开始 您可以使用压缩功能将多个文件合并为一个 https nightlies apache org flink flink docs master docs connectors datastre
  • Apache Flink - “keyBy”中的异常处理

    由于代码错误或缺乏验证 进入 Flink 作业的数据可能会触发异常 我的目标是提供一致的异常处理方式 我们的团队可以在 Flink 作业中使用这种方式 而不会导致生产中出现任何停机 重启策略似乎不适用于此处 因为 简单的重启无法解决问题 我
  • Flink Kafka - 如何使应用程序并行运行?

    我正在 Flink 中创建一个应用程序 读取某个主题的消息 对其进行一些简单的处理 将结果写入不同的主题 我的代码确实有效 然而它不并行运行我怎么做 看来我的代码只在一个线程 块上运行 在 Flink Web 仪表板上 应用程序进入运行状态
  • 2022年Flink可以支持什么Java版本?

    假设我开始一个新的 Flink Java 项目 如果我寻找 稳定的 Flink Java 生产体验 我应该使用哪个版本 官方docs https nightlies apache org flink flink docs master do

随机推荐