Kafka重启时如何让Kafka Source重新连接

2024-02-06

我创建一个Source使用 Reactive Kafka 的消费者记录如下:

val settings = ConsumerSettings(system, keyDeserializer, valueDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId(groupName)
// what offset to begin with if there's no offset for this group
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// do we want to automatically commit offsets?
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
// auto-commit offsets every 1 minute, in the background
.withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
// reconnect every 1 second, when disconnected
.withProperty(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "1000")
// every session lasts 30 seconds
.withProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
// send heartbeat every 10 seconds i.e. 1/3 * session.timeout.ms
.withProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000")
// how many records to fetch in each poll( )
.withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100")

Consumer.atMostOnceSource(settings, Subscriptions.topics(topic)).map(_.value)  

我的本地计算机上运行着 1 个 Kafka 实例。我通过控制台生成器将值推送到主题中并看到它们打印出来。然后我杀死 Kafka,并重新启动它以查看源是否重新连接。

以下是我的日志的处理方式:

* Connection with /192.168.0.1 disconnected
    java.net.ConnectException: Connection refused
* Give up sending metadata request since no node is available
* Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
* Resuming partition test-events-0
* Error while fetching metadata with correlation id 139 : {test-events=INVALID_REPLICATION_FACTOR}
* Sending metadata request (type=MetadataRequest, topics=test-events) to node 0
* Sending GroupCoordinator request for group mytestgroup to broker 192.168.0.1:9092 (id: 0 rack: null)
* Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
* Received GroupCoordinator response ClientResponse(receivedTimeMs=1491797713078, latencyMs=70, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=166,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group mytestgroup
* Error while fetching metadata with correlation id 169 : {test-events=INVALID_REPLICATION_FACTOR}
* Received GroupCoordinator response ClientResponse(receivedTimeMs=1491797716169, latencyMs=72, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=196,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group mytestgroup
09:45:16.169 [testsystem-akka.kafka.default-dispatcher-16] DEBUG o.a.k.c.c.i.AbstractCoordinator - Group coordinator lookup for group mytestgroup failed: The group coordinator is not available.
09:45:16.169 [testsystem-akka.kafka.default-dispatcher-16] DEBUG o.a.k.c.c.i.AbstractCoordinator - Coordinator discovery failed for group mytestgroup, refreshing metadata
* Initiating API versions fetch from node 2147483647
* Offset commit for group mytestgroup failed: This is not the correct coordinator for this group.
* Marking the coordinator 192.168.43.25:9092 (id: 2147483647 rack: null) dead for group mytestgroup
* The Kafka consumer has closed.  

如何确保此源重新连接并继续处理日志?


我认为你至少需要有2个经纪人。如果其中一台失败,另一台可以完成这项工作,您可以重新启动另一台。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka重启时如何让Kafka Source重新连接 的相关文章

随机推荐

  • OOD / OOP 练习曲 / 代码练习

    我已经在网上搜索了一段时间了 我正在寻找用于 OOD 实践 以及一些内部 TDD 研讨会 的小样本练习 如果有一个地方可以满足这一需求 请指出它 并关闭此问题 限制条件 与语言无关的现实世界问题 小 最多需要一到两个小时才能解决的问题 或者
  • 无法在Java / C++中为外部应用程序设置always-on-top

    我正在寻找解决方案 使外部应用程序 不是像记事本或 calc exe 这样的 Windows 应用程序 在按下 Java GUI 中的按钮后始终保持在最上面 我在 C 中使用此代码来获取桌面上所有打开的窗口 并将其进程 ID PID 与发送
  • 可更新视图 - SQL Server 2008

    关于可更新数据库视图的问题 我正在阅读有关该主题的一些 MSDN 文档 并且遇到以下限制 任何修改 包括 UPDATE INSERT 和 DELETE 语句 都必须仅引用一个基表中的列 我只是想确保我理解该限制 我想在我的几个媒体评论项目中
  • 多边形中的点

    我正在尝试解决一些 SPOJ 问题https www spoj pl problems FSHEEP https www spoj pl problems FSHEEP 我们必须找出点是否在多边形内部 正如我们所看到的 它不是凸多边形 问题
  • 如何更改pairs()的轴位置?

    默认情况下 pairs 将轴放在图的所有边上 在边之间交替 但是 我将数据集之间的相关性放在上三角形中 所以我想像这样调整轴位置 我需要设置哪些参数 您可以自定义配对功能 如果你看一下代码 就会发现轴是在 2 个嵌套的 for 循环内绘制的
  • 为什么 RGB 使用 6 个十六进制数字?

    据我所知 RGB 用两个十六进制数字编码颜色 对应于红色 绿色和蓝色分量 例如 ff0000 是纯红色 据我了解 每个十六进制数字代表 0 15 之间的数字 或 4 位信息 但是如何用 32 位来表示每种颜色呢 为什么使用两位数字表示红色
  • 如何在 Mercurial 上 git reset --hard HEAD?

    我是一名 Git 用户 正在尝试使用 Mercurial 事情是这样的 我做了一个hg backout在我想恢复的变更集上 这创建了一个新头 因此 hg 指示我合并 我认为回到 默认 合并后 它告诉我我仍然必须提交 然后我注意到在解决合并中
  • Bash 脚本 - 变量内容作为命令运行

    我有一个 Perl 脚本 它给我一个定义的随机数列表 这些随机数对应于文件的行 接下来我想使用从文件中提取这些行sed bin bash count cat last queries txt wc l var perl test pl te
  • 如何通过拖动顶部的 div 来调整其大小?

    我想在拖动两个 div 之间的部分时调整 div 的大小 在搜索中我发现this http jsfiddle net gaby Bek9L 1779 但我不知道如何使这个水平而不是可用的垂直拖动 我的 div 看起来像 div div di
  • Install4j:有没有办法用包含占位符的文本覆盖欢迎消息?

    我需要覆盖install4j欢迎消息 其中包含我需要在运行时解析的占位符文本 将从属性文件中读取替换值 welcomeLabel3 Text 0 another text 1 无法向系统消息添加占位符 您必须指定整个消息 但是 您可以使用安
  • 如何从 javascript 文件(而不是 vue 组件)获取 vuex 状态

    我正在使用 vuex 2 1 1 并让事情在 vue 单文件组件中工作 然而 为了避免 vue 单文件组件中出现太多问题 我将一些函数移至utils js我将其导入到 vue 文件中的模块 在这个utils js我想阅读 vuex 状态 我
  • 初学者:AVR C++ Atmel Studio 6

    我在确定我可以访问哪些库时遇到问题 我知道我可以使用 Atmel Studio 6 IDE 用 C 对微控制器 Atmega328p 进行编程 但是 我无法弄清楚我可以访问哪些库的记录在哪里 例如 我可以使用 STL 例如向量 双端队列 吗
  • Google Maps API V3 -> 利用 MarkerCluster 但簇本身是否特定于绘制的多边形/区域?

    好吧 让我以我已经创建了很多谷歌地图的事实作为这个问题的序言 但它们是严格的标记和表示路线的折线以及一些处理程序交互 现在我希望基本上显示一张世界地图 主要是北美 我想用我拥有的一些纬度 经度将这片大陆分成我预定义的区域 使用这些区域 我想
  • CSS 面包屑箭头指向左侧

    我发现这个 css 面包屑指向右侧 我想指向左侧 相信我 我一遍又一遍地尝试 但没有成功 请有人告诉我该怎么做 div span display inline block position relative background 88b7d
  • 改造 - 更改 BaseUrl

    我有一个场景 我必须使用相同的基本 URL 调用 API 例如www myAPI com但以不同的baseUrl 我有一个 Retrofit 2 的实例 它是通过Builder return new Retrofit Builder bas
  • 将 UL 在 DIV 内垂直居中

    我有以下内容 div style background Red height 100px ul li a href Home a li ul div 我想将 ul 垂直居中在 div 中 但我不知道如何 小提琴演示 http jsfiddl
  • 如何在C++03中用sprintf正确替换sprintf_s?

    sprintf s是该函数的 Microsoft 实现sprintf他们修补了一个缺陷 添加了一个参数来获取函数限制写入的边界值 等效的引入C 11 snprintf 但在这里 我们谈论的是C 03 syntax 签名 count char
  • 为什么 Unity 会忽略非静态公共字段的初始化值?

    我在用着InvokeRepeating http docs unity3d com ScriptReference MonoBehaviour InvokeRepeating html调用游戏中的方法 我打电话InvokeRepeating
  • 在Featuretools中计算多个训练窗口的特征

    我有一张包含客户和交易的表 有没有办法获取过去 3 6 9 12 个月过滤的功能 我想自动生成功能 过去 3 个月的跨性别者数量 过去 12 个月内跨性别者数量 过去 3 个月的平均跨性别者 过去 12 个月的平均跨性别者 我尝试过使用tr
  • Kafka重启时如何让Kafka Source重新连接

    我创建一个Source使用 Reactive Kafka 的消费者记录如下 val settings ConsumerSettings system keyDeserializer valueDeserializer withBootstr