Apache Flink:Python 流 API 中的 Kafka 连接器,“无法加载用户类”

2023-12-09

我正在尝试 Flink 的新 Python 流 API 并尝试使用以下命令运行我的脚本./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py。 python 脚本相当简单,我只是尝试从现有主题中使用并将所有内容发送到 stdout(或输出方法默认发出数据的日志目录中的 *.out 文件)。

import glob
import os
import sys
from java.util import Properties
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.streaming.api.collector.selector import OutputSelector
from org.apache.flink.api.common.serialization import SimpleStringSchema

directories=['/home/user/flink/flink-1.6.1/lib']
for directory in directories:
    for jar in glob.glob(os.path.join(directory,'*.jar')):
                sys.path.append(jar)

from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer09

props = Properties()
config = {"bootstrap_servers": "localhost:9092",
          "group_id": "flink_test",
          "topics": ["TopicCategory-TopicName"]}
props.setProperty("bootstrap.servers", config['bootstrap_servers'])
props.setProperty("group_id", config['group_id'])
props.setProperty("zookeeper.connect", "localhost:2181")

def main(factory):
    consumer = FlinkKafkaConsumer09([config["topics"]], SimpleStringSchema(), props)

    env = factory.get_execution_environment()
    env.add_java_source(consumer) \
        .output()
    env.execute()

我从 Maven 存储库中获取了一些 jar 文件,即flink-connector-kafka-0.9_2.11-1.6.1.jar, flink-connector-kafka-base_2.11-1.6.1.jar and kafka-clients-0.9.0.1.jar并将它们复制到 Flink 中lib目录。除非我误解了文档,否则这应该足以让 Flink 加载 kafka 连接器。事实上,如果我删除这些罐子中的任何一个,导入都会失败,但这似乎不足以实际调用该计划。 添加 for 循环以动态地将它们添加到sys.path也没用。以下是控制台中打印的内容:

Starting execution of program
Failed to run plan: null
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/tmp/flink_streaming_plan_9cfed4d9-0288-429c-99ac-df02c86922ec/read_from_kafka.py", line 32, in main
    at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
    at org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)

org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: bbcc0cb2c4fe6e3012d228b06b270eba)

The program didn't contain a Flink job. Perhaps you forgot to call execute() on the execution environment.

这是我在日志中看到的:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class:    org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
ClassLoader info: URL ClassLoader:
    file: '/tmp/blobStore-9f6930fa-f1cf-4851-a0bf-2e620391596f/job_ca486746e7feb42d2d162026b74e9935/blob_p-9321896d165fec27a617d44ad50e3ef09c3211d9-405ccc9b490fa1e1348f0a76b1a48887' (valid JAR)
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)

有没有办法解决这个问题并使连接器可用于 Python?我怀疑这是 Jython 的类加载器问题,但我不知道如何进一步调查(也考虑到我对 Java 不了解)。非常感谢。


您在这里使用了错误的 Kafka 消费者。在你的代码中,它是FlinkKafkaConsumer09,但是您正在使用的库是flink-connector-kafka-0.11_2.11-1.6.1.jar,这是为了FlinkKafkaConsumer011。尝试更换FlinkKafkaConsumer09有了这个FlinkKafkaConsumer011,或者使用lib文件flink-connector-kafka-0.9_2.11-1.6.1.jar而不是当前的。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Flink:Python 流 API 中的 Kafka 连接器,“无法加载用户类” 的相关文章

  • 在 Python 中解析 TCL 列表

    我需要在双括号上拆分以空格分隔的 TCL 列表 例如 OUTPUT 172 25 50 10 01 01 Ethernet 172 25 50 10 01 02 Ethernet Traffic Item 1 172 25 50 10 01
  • 类的 IPython 表示

    我正在使用我创建的模块尝试 IPython 但它没有显示类对象的实际表示 相反 它显示类似的内容 TheClass module TheClass name I heavily在这个模块中使用元类 我有真正有意义的类表示 应该向用户显示 是
  • 我应该使用 Python 双端队列还是列表作为堆栈? [复制]

    这个问题在这里已经有答案了 我想要一个可以用作堆栈的 Python 对象 使用双端队列还是列表更好 元素数量较少还是数量较多有什么区别 您的情况可能会根据您的应用程序和具体用例而有所不同 但在一般情况下 列表非常适合堆栈 append is
  • 嵌套列表的重叠会产生不必要的间隙

    我有一个包含三个列表的嵌套 这些列表由 for 循环填充 并且填充由 if 条件控制 第一次迭代后 它可能类似于以下示例 a 1 2 0 0 0 0 0 0 4 5 0 0 0 0 0 0 6 7 根据条件 它们不重叠 在第二次迭代之后 新
  • 如何从Python中的函数返回多个值? [复制]

    这个问题在这里已经有答案了 如何从Python中的函数返回多个变量 您可以用逗号分隔要返回的值 def get name you code return first name last name 逗号表示它是一个元组 因此您可以用括号将值括
  • 在 Django Admin 中调整字段大小

    在管理上添加或编辑条目时 Django 倾向于填充水平空间 但在某些情况下 当编辑 8 个字符宽的日期字段或 6 或 8 个字符的 CharField 时 这确实是一种空间浪费 字符宽 然后编辑框最多可容纳 15 或 20 个字符 我如何告
  • PyQt 使用 ctrl+Enter 触发按钮

    我正在尝试在我的应用程序中触发 确定 按钮 我当前尝试的代码是这样的 self okPushButton setShortcut ctrl Enter 然而 它不起作用 这是有道理的 我尝试查找一些按键序列here http ftp ics
  • Pycharm 在 os.path 连接上出现“未解析的引用”

    将pycharm升级到2018 1 并将python升级到3 6 5后 pycharm报告 未解析的引用 join 最新版本的 pycharm 不会显示以下行的任何警告 from os path import join expanduser
  • Python 内置的 super() 是否违反了 DRY?

    显然这是有原因的 但我没有足够的经验来认识到这一点 这是Python中给出的例子docs http docs python org 2 library functions html super class C B def method se
  • Python 3:将字符串转换为变量[重复]

    这个问题在这里已经有答案了 我正在从 txt 文件读取文本 并且需要使用我读取的数据之一作为类实例的变量 class Sports def init self players 0 location name self players pla
  • python的shutil.move()在linux上是原子的吗?

    我想知道python的shutil move在linux上是否是原子的 如果源文件和目标文件位于两个不同的分区上 行为是否不同 或者与它们存在于同一分区上时的行为相同吗 我更关心的是如果源文件和目标文件位于同一分区上 shutil move
  • 通过Python连接到Bigquery:ProjectId和DatasetId必须非空

    我编写了以下脚本来通过 SDK 将 Big Query 连接到 Python 如下所示 from google cloud import bigquery client bigquery Client project My First Pr
  • pandas - 包含时间序列数据的堆积条形图

    我正在尝试使用时间序列数据在 pandas 中创建堆积条形图 DATE TYPE VOL 0 2010 01 01 Heavy 932 612903 1 2010 01 01 Light 370 612903 2 2010 01 01 Me
  • Python GTK+ 画布

    我目前正在通过 PyGobject 学习 GTK 需要画布之类的东西 我已经搜索了文档 发现两个小部件似乎可以完成这项工作 GtkDrawingArea 和 GtkLayout 我需要一些基本函数 如 fillrect 或 drawline
  • 如何使用 AWS Lambda Python 读取 AWS S3 存储的 Word 文档(.doc 和 .docx)文件内容?

    我的场景是 我尝试使用 python 实现从 Aws Lambda 读取 AWS 存储的 S3 word 文档 doc 和 docx 文件内容 下面的代码是我使用的 我的问题是我可以获取文件名 但无法读取内容 def lambda hand
  • 如何使用 Boto3 启动具有 IAM 角色的 EC2 实例?

    我无法弄清楚如何使用指定的 IAM 角色在 Boto3 中启动 EC2 实例 以下是迄今为止我如何成功创建实例的一些示例代码 import boto3 ec2 boto3 resource ec2 region name us west 2
  • 如何将 Django 中的权限添加到模型并使用 shell 进行测试

    我在模型中添加了 Meta 类并同步了数据库 然后在 shell 中创建了一个对象 它返回 false 所以我真的无法理解错误在哪里或者缺少什么是否在其他文件中可能存在某种配置 class Employer User Employer in
  • 如何使用 PrimaryKeyRelatedField 更新多对多关系上的类别

    Django Rest 框架有一个主键相关字段 http www django rest framework org api guide relations primarykeyrelatedfield其中列出了我的 IDmany to m
  • pandas.read_csv 将列名移动一倍

    我正在使用位于的 ALL zip 文件here http www fec gov disclosurep PDownload do 我的目标是用它创建一个 pandas DataFrame 但是 如果我跑 data pd read csv
  • python 对浮点数进行不正确的舍入

    gt gt gt a 0 3135 gt gt gt print 3f a 0 314 gt gt gt a 0 3125 gt gt gt print 3f a 0 312 gt gt gt 我期待 0 313 而不是 0 312 有没有

随机推荐

  • 使用 async 时,bcrypt.hash 函数返回 undefined,但与 .then 一起工作正常

    这是异步代码 返回undefined userService register username password gt return bcrypt hash password saltRounds async err hash gt co
  • 在 Windows 窗体中以编程方式添加新的用户控件

    嘿 首先我想指出 我知道这里还有关于这个主题的其他几个问题 我什至以前自己也做过这件事 我在这里问是因为我不知道我的问题是什么 这是我尝试显示新用户控件的代码 private void ValidationLabel Click objec
  • 使用 jQuery 更改下拉列表的选定值

    我有一个包含已知值的下拉列表 我想做的是将下拉列表设置为我知道存在的特定值jQuery 使用常规JavaScript 我会做类似的事情 ddl document getElementById ID of element goes here
  • 我们可以将 Laravel 项目作为库集成到 CodeIgniter 中吗?

    我想通过集成一些用 laravel 编写的代码来增加 CodeIgniter 项目的功能 我该如何接近 我可以通过 CodeIgniter 库包含代码吗 如果是的话怎么办 我只想将控制器和 ORM 包含到 CI 中 Laravel 代码是一
  • R 中 apply 中的 equal() 行为

    这很奇怪 apply matrix c 1 NA 2 3 NA NA 2 4 ncol 2 1 function x identical x 1 x 2 1 FALSE TRUE TRUE FALSE apply data frame a
  • 在这种情况下,iPhone 上的“发布”是什么意思?

    我想问一个关于 iPhone 应用程序的愚蠢问题 我是iPhone应用程序的绿色 我在Apple网站上阅读了以下代码 MyViewController aViewController MyViewController alloc initW
  • 设计模式 - 理解外观模式

    我是设计模式的新手 正在尝试了解它们通常的样子 现在我正在尝试理解外观模式 我觉得外观模式是一个相当广泛的概念 所以我想知道我的第二个图是否会被视为外观模板的一部分 我知道一个典型的外观模式基本上是这样的 A 级是外观 但是如果我们有一个更
  • 密钥不得包含 . pymongo 中的错误

    我试图通过 pymongo 获取 serverStatus 命令的输出 然后将其插入到 mongodb 集合中 这是字典 u metrics u getLastError u wtime u num 0 u totalMillis 0 u
  • PHP Constant() 不适用于名称空间?

    以下不起作用 use application components auditor AuditLevel public function actionAudit data unserialize POST data message data
  • 如何更新由另一个组合框触发的组合框中的值?

    我的表单中有 2 个组合框 我希望在组合框 2 中的列表更新时更改组合框 1 中的选定值 例如 ComboBox1 包含移动公司的名称 ComboBox2 包含该公司的所有手机列表 假设您有一个将手机型号与其制造商关联起来的字典 Dicti
  • 流星:云中

    我正在尝试上传 Lepozepo cloudinary 的照片 这是我的服务器和客户端配置 server Cloudinary config cloud name api key api secret client cloudinary c
  • UIViewController 的背景到分组表视图颜色

    在 UITableView 分组样式中 表格视图的背景有点像浅灰色的纹理颜色 如何获取该值以便将 UIViewController 的整个背景设置为该颜色 如果您正在为 iPhone 和 iPod touch 进 行开发 UIColor定义
  • 搜索文本文件并插入行

    我想要做的是 以下面的文本为例 在文本文件中搜索字符串 Text2 然后在 Text 2 后两行插入一行 插入文本 文本 2 可以位于文本文件中的任何行 但我知道它会在文本文件中出现一次 所以这是原始文件 Text1 Text2 Text3
  • 从存档导出 ipa 时 Xcode 9 崩溃

    我在 Xcode 9 中为任何项目创建了一个存档 然后我尝试创建一个 ipa 文件 开发文件或临时文件 我首先尝试导出存档 然后我选择 开发 或 临时分发 Xcode 9 崩溃 我什至在应用程序的 info plist 中添加了 编译位码
  • 向函数发送 stderr/stdout 消息并捕获退出信号

    我正在处理错误处理并登录我的 bash 脚本 下面我提供了一个简化的代码片段来举例说明用例 我想在我的脚本中实现以下目标 陷阱退出信号应触发下面代码中的 onexit 函数 stderr 和 stdout 应发送到 log 函数 该函数将确
  • R 中的嵌套 if-else 循环

    我有一个名为 crimes 的数据框 其中包含一个 pre rate 列 表示实施特定法律之前的犯罪率 我想使用嵌套的 if else 循环将每个费率放入 rate category 列中 我有以下代码 crimes rate catego
  • 为什么c++使用memset(addr,0,sizeof(T))来构造一个对象?标准或编译器错误?

    这个问题和我的另一篇文章有 关 为什么 allocate shared 和 make shared 这么慢 在这里我可以更清楚地描述这个问题 考虑下面的代码 struct A char data 0x10000 class C public
  • 在多个 UIView 上添加 Facebook Shimmer

    我正在尝试在具有多个 UIView 的 UICollectionViewCell 上添加 Facebook Shimmer For 一个 UIView 使用下面的代码可以正常工作 let shimmeringView FBShimmerin
  • 执行存储在数据库中的Java代码

    我有定期推送到数据库的 Java 代码 解释它为什么在数据库中太复杂 这只会将焦点从主要问题上移开 在运行时我查询数据库 我可以执行从数据库获取的代码吗 我只将 main 方法的内容存储在代码中 运行数据库的服务器是HTTP 服务器 数据库
  • Apache Flink:Python 流 API 中的 Kafka 连接器,“无法加载用户类”

    我正在尝试 Flink 的新 Python 流 API 并尝试使用以下命令运行我的脚本 flink 1 6 1 bin pyflink stream sh examples read from kafka py python 脚本相当简单