下沉 kafka 流时看不到消息,并且在 flink 1.2 中看不到打印消息

2024-01-09

我的目标是使用kafka读取json格式的字符串,对字符串进行过滤,然后将消息接收出来(仍然是json字符串格式)。

出于测试目的,我的输入字符串消息如下所示:

{"a":1,"b":2}

我的实现代码是:

def main(args: Array[String]): Unit = {

// parse input arguments
val params = ParameterTool.fromArgs(args)

if (params.getNumberOfParameters < 4) {
  println("Missing parameters!\n"
    + "Usage: Kafka --input-topic <topic> --output-topic <topic> "
    + "--bootstrap.servers <kafka brokers> "
    + "--zookeeper.connect <zk quorum> --group.id <some id> [--prefix <prefix>]")
  return
}

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
// create a checkpoint every 5 seconds
env.enableCheckpointing(5000)
// make parameters available in the web interface
env.getConfig.setGlobalJobParameters(params)

// create a Kafka streaming source consumer for Kafka 0.10.x
val kafkaConsumer = new FlinkKafkaConsumer010(
  params.getRequired("input-topic"),
  new JSONKeyValueDeserializationSchema(false),
  params.getProperties)

val messageStream = env.addSource(kafkaConsumer)

val filteredStream: DataStream[ObjectNode] = messageStream.filter(node => node.get("a").asText.equals("1")
                      && node.get("b").asText.equals("2"))

messageStream.print()
// Refer to: https://stackoverflow.com/documentation/apache-flink/9004/how-to-define-a-custom-deserialization-schema#t=201708080802319255857
filteredStream.addSink(new FlinkKafkaProducer010[ObjectNode](
  params.getRequired("output-topic"),
  new SerializationSchema[ObjectNode] {
    override def serialize(element: ObjectNode): Array[Byte] = element.toString.getBytes()
  }, params.getProperties
))

env.execute("Kafka 0.10 Example")
}

可以看出,我想将消息流打印到控制台并将过滤后的消息接收到kafka。然而,我看不到他们两个。

有趣的是,如果我将 KafkaConsumer 的模式从 JSONKeyValueDeserializationSchema 修改为 SimpleStringSchema,我可以看到 messageStream 打印到控制台。代码如下图:

 val kafkaConsumer = new FlinkKafkaConsumer010(
  params.getRequired("input-topic"),
  new SimpleStringSchema,
  params.getProperties)

val messageStream = env.addSource(kafkaConsumer)
messageStream.print()

这让我想到如果我使用 JSONKeyValueDeserializationSchema,我的输入消息实际上不会被 Kafka 接受。但这看起来很奇怪,与在线文档有很大不同(https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html)

希望有人可以帮助我!


The JSONKeyValue反序列化Schema()每个 kafka 消息都需要消息密钥,并且我假设在生成 JSON 消息并通过 kafka 主题发送时没有提供密钥。

因此,要解决该问题,请尝试使用JSONDeserializationSchema()它只需要消息并根据收到的消息创建一个对象节点。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

下沉 kafka 流时看不到消息,并且在 flink 1.2 中看不到打印消息 的相关文章

  • 使用 commons-exec 流式输出?

    谁能给我一个例子来说明如何流式传输外部程序的输出DefaultExecutor 我没有找到任何描述如何执行此操作的文档 我的外部进程将运行几个小时 因此仅获取所有输出数据是不可行的 它必须被流式传输 注意 此解决方案是同步的 因此它不会流式
  • HTML 5 视频流 .ism 文件?

    我有一个带有媒体服务 4 0 的 IIS 7 0 服务器设置 我创建了一个非常简单的 html 5 页面 其中包含video以其source指向一个 ism文件 是否可以使用 html 5 中的 ism 文件的清单来播放视频 就像在 sil
  • FFmpeg RTP 流媒体错误 [关闭]

    Closed 这个问题是无关 help closed questions 目前不接受答案 我想通过 FFmpeg 播放视频文件 但出现此错误 RTP 复用器仅支持一种流 当我写这个时 我得到了这个错误 ffmpeg exe i SomeVi
  • Flink Logging 获取作业名称或作业 ID

    我正在尝试设置 logback xml 以便它将包含与日志记录关联的 JobName 或 JobId 我还没有找到一种方法来做到这一点 是否可以 最终我想要实现的是能够将日志发送到 ElasticSearch 并用消息标记 JobName
  • Spark Streaming以Parquet格式附加到S3,小分区太多

    我正在构建一个使用 Spark Streaming 从 AWS EMR 上的 Kinesis 流接收数据的应用程序 目标之一是将数据持久保存到 S3 EMRFS 中 为此我使用 2 分钟的非重叠窗口 我的做法 Kinesis Stream
  • 在android studio中使用java解析m3u文件

    我正在寻找用java解析m3u文件 频道名称然后是它的链接我已经用谷歌搜索过这个但无法找到解决方案 m3u 文件如下所示 EXTM3U EXTINF 1 VIP AR Bein Max 1 HD http portal onlineiptv
  • 使用 Servlet 启动 VLC HTTP Stream 时出现问题

    我正在为自己开发一个 VLC 项目 我的目标是创建一个 HTML 前端来启动流 我通过使用 Java Servlet 来完成此操作 概述 乌班图13 04 Java 7 21 冰茶 2 3 9 Eclipse JAVAEE IDE 雄猫7
  • 我应该使用哪种协议来传输音频(非直播)? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 流媒体视频文件?

    我需要流式传输 flv 文件 流媒体应该看起来像直播 我应该有一种方法可以更改目标文件 抱歉我的英语不好 如果 流式传输 的意思是 显示 Flash 视频剪辑 则 flv streaming 并不是真正的流式传输 而是正常的文件传输 即使文
  • 如何使用 Angular/Ionic/JS 显示 Motion JPEG 二进制数据流?

    我正在为设备编写应用程序 此类设备将收到 POST 请求 并发回multipart x mixed replace二进制数据流 我必须在我的应用程序主页的一部分上显示此类流 我查了一下 这种情况的资源非常有限 到目前为止 我发现如果 Mot
  • 对 Parquet 批量格式使用压缩

    从 Apache Flink 1 15 版本开始 您可以使用压缩功能将多个文件合并为一个 https nightlies apache org flink flink docs master docs connectors datastre
  • Apache Flink - “keyBy”中的异常处理

    由于代码错误或缺乏验证 进入 Flink 作业的数据可能会触发异常 我的目标是提供一致的异常处理方式 我们的团队可以在 Flink 作业中使用这种方式 而不会导致生产中出现任何停机 重启策略似乎不适用于此处 因为 简单的重启无法解决问题 我
  • Twitter Streaming API 使用的官方编码?是UTF-8吗?

    Twitter 流 API 的官方编码是什么 根据我所看到的 我最好的猜测是 UTF 8 但我想避免做出假设 我见过的 Twitter 网站上唯一暗示他们使用什么作为官方编码的部分是在这里 Twitter 不想因为我们使用 UTF 8 或相
  • 黑莓上的视频流

    有没有办法从服务器流式传输和播放视频文件 黑莓是否提供可以播放流视频的内置视频播放器 是的你可以 在 bb 设备上串流视频有两种方法 使用 jsr 135 中的 javax microedition media Player 使用标准媒体应
  • WCF流模式确实很慢

    我想知道为什么流模式下的 WCF 与缓冲模式相比确实很慢 基本上 我从服务器读取大量数据 数据库访问 然后通过 WCF 将大量数据传输到其他客户端 我通过比较两种不同的传输模式进行了一些测试和基准测试 我创建了 2 个端点 第一个正在使用t
  • 如何获取 tokio-io 的 async_read 文件句柄

    我想从文件句柄中流式传输行 但我不知道如何满足File has async read use std fs File use std io BufReader BufRead use tokio core reactor Handle us
  • 使用 Android Exoplayer 调整 Dash 流音量

    我正在尝试设置一个搜索栏来控制 exoplayer 流式破折号实例的级别 我正在使用的设置是演示项目的修改版本 并且无法确定我应该尝试影响搜索栏输出的哪个元素 即如何正确使用 MSG SET VOLUME 等 任何意见将不胜感激 我正在寻找
  • React Native:相机流的多个预览

    我有一个要在 Android 和 iOS 中开发的 React Native 应用程序 我想在其中获取相机流 进行一些处理 然后多次渲染它 想象一下像 Instagram 这样的应用程序 您可以在其中实时向相机添加滤镜 并在实时预览中显示应
  • Flink任务管理器内存不足和内存配置

    我们使用 Flink 流在单个集群上运行一些作业 我们的工作是使用rocksDB 来保存状态 该集群配置为在 3 个独立的 VM 上使用单个 Jobmanager 和 3 个 Taskmanager 运行 每个 TM 均配置为运行 14GB
  • android 媒体播放器 - 如何禁用范围请求? (Nexus 7 上的音频流中断)

    我有一个音频流应用程序 它运行本地代理服务器 本地代理服务器与互联网流媒体源建立 http 连接 在本地获取并缓冲流数据 然后 在应用程序内部 我使用 MediaPlayer 连接到本地代理服务器 使用方法 mediaPlayer setD

随机推荐

  • 是否可以编译 linq-to-objects 的查询

    我有一个递归循环中的 linq to 对象查询 担心当对象接近 1000 个以上并且网站上的用户超过 100 个时 我的网站就会崩溃 那么是否可以编译 linq to object 查询 linq 查询只是查找节点的直接子节点 看看为什么这
  • 在另一个 Case 语句中使用 Case 语句的结果

    我有相当长的SELECT查询 但我已将相关部分粘贴到此处 我需要使用我的结果CASE在另一个语句中使用CASE陈述 我正在 SQL Server 中执行此操作 将非常感谢您的帮助 SELECT CompanyContact Name AS
  • Logstash可以直接读取远程日志吗?

    我是 Logstash 的新手 几天来我一直在阅读有关它的内容 和大多数人一样 我试图拥有一个集中式日志系统并将数据存储在 elasticsearch 中 然后使用 kibana 来可视化数据 我的应用程序部署在许多服务器中 因此我需要从所
  • 如何在 then 语句中返回一系列承诺

    因此 在过去的几个小时里 我一直在研究异步内容并使用 Promise 我正在使用测试框架量角器 并且有一些异步问题我遇到了麻烦 在此保存函数中 我异步调用 cm org1 all 然后使用 then 获取响应 我循环响应 并且需要对响应中的
  • hibernate hbm 文件中的 @Convert 相当于什么?

    我写了一个属性转换器 我想将其应用到一个实体中 到目前为止 我正在遵循纯粹的 XML 方法 我找不到相当于 Convert in hbm符号 举个例子将不胜感激 当我搜索这个时 可以理解的是 Google 返回了很多有关 自动将 hbm 文
  • VS2015中c#类的每个方法添加断点

    我有一个类 我希望调试器在调用其任何方法时停止 我尝试使用 New function breakpoint 但我找不到可用的通配符 Function Name 例如 我尝试了 MyNamespace MyClass 有 c 的示例 但它们似
  • 如何检测硬件按键点击?

    我现在需要检测当 flutter 应用程序位于前台或打开时是否按下了任何硬件按钮 例如 当有人按下音量或另一个按钮 即使是关闭电源的按钮 时 我想在我的应用程序中执行某些操作 我知道 当打开一个 flutter 应用程序并且我正在查看应用程
  • 无法在 groovy 中传递闭包

    我正在尝试运行 Geb 库的基本示例 http www gebish org manual current intro html introduction 这是代码 import geb Browser Browser drive go h
  • 环形包裹地图上一组点之间的“质心”,可最小化到所有点的平均距离

    edit 正如有人指出的那样 我正在寻找的实际上是最小化所有其他点之间的总测地距离的点 我的地图在地形上与 吃豆人 和 小行星 中的地图相似 越过顶部将使您扭曲到底部 越过左侧将使您扭曲到右侧 假设我在地图上有两个点 质量相同 我想找到它们
  • 开发面板中的本地化错误

    我购买了一个应用程序 尝试更新开发面板上的信息 当我尝试保存时收到以下错误 您的 1 个本地化内容有错误 它显示了错误位置 但我不知道问题是什么 如果您在选中媒体管理器中的复选框以使用新的 较大的屏幕尺寸屏幕截图之前没有删除较小 较旧尺寸的
  • Web.config保存问题

    我想通过 Web 应用程序的前端向用户公开一些 web config 设置 我可以毫无问题地检索设置 但是当我保存时 我要么收到错误 要么更改不会保留到 web config 文件中 我是在VS中调试的 如果我运行这个 private vo
  • 根据颜色图绘制条形图中的 y 值

    我已经在论坛上搜索过 发现this https stackoverflow com questions 42656585 barplot colored according a colormap 但我的问题有点不同 正如您从代码和下面的图像
  • 自定义 DataGridView 重复列

    我通过从 DataGridView 子类化创建了一个自定义 Winforms 控件 自定义 datagridview 定义自己的列和映射 但是 每当我将其从工具箱拖到窗体的设计图面上时 窗体都会为自定义控件中的每一列重新创建一个列控件 Da
  • Keras ML 库:梯度更新后如何进行权重裁剪? TensorFlow 后端

    我正在尝试使用 Keras 来实现需要权重裁剪的算法的一部分 即限制梯度更新后的权重值 到目前为止 我还没有通过网络搜索找到任何解决方案 作为背景 这与 WGAN 算法有关 https arxiv org pdf 1701 07875 pd
  • Spring Security OAuth 与 JWK 示例

    有人有一个带有 JWT 和非对称密钥的 Spring Security OAuth 2 资源服务器 SP 示例 该示例使用带有 JWKS 端点的 JWK 吗 多谢 散文 Spring Security OAuth 2 资源服务器可以配置为使
  • 什么会导致无法计算 UDP 数据报的 IP 标头校验和?

    我试图将 UDP 数据报从 Windows XP 上的 UdpClient 发送到设备 但它没有响应 当我在 Wireshark 中查看该流量时 我发现出站数据包很糟糕 因为它们的所有 IP 标头校验和都是 0x0000 该机器有两个网卡
  • 执行 kubeadm Reset 后 Kubernetes 无法为 pod 设置网络

    我用以下命令初始化了 Kuberneteskubeadm init 并且在我使用之后kubeadm reset重置它我发现 pod network cidr错了 更正后我尝试使用kubeadm像这样再次初始化 Kubernetes kube
  • Logstash 的 Django 日志记录格式

    我正在尝试将 django 应用程序配置为以 Logstash 易于使用的格式写入日志 受到 Node 的 Winston 日志记录包的启发 Logstash 需要一个 JSON 对象 其中包含键 message 和时间戳 timestam
  • 未找到“Mage_Googlecheckout_Helper_Data”类

    我们刚刚从 Magento 版本 1 8 0 0 升级到 1 8 1 0 现在当我们转到站点的配置部分时 我们会收到以下消息 Fatal error Class Mage Googlecheckout Helper Data not fou
  • 下沉 kafka 流时看不到消息,并且在 flink 1.2 中看不到打印消息

    我的目标是使用kafka读取json格式的字符串 对字符串进行过滤 然后将消息接收出来 仍然是json字符串格式 出于测试目的 我的输入字符串消息如下所示 a 1 b 2 我的实现代码是 def main args Array String