Kafka - 如何捕获kafka客户端后台线程生成的消息

2024-04-06

使用以下配置来模拟消费者关闭/会话超时。我们如何捕获客户端记录到控制台的消息 - SESSTMOUT|rdkafka#consumer-1| [第三:主要]

consumed message None: msg1: 0: first_topic: 0: None
consumed message None: msg2: 1: first_topic: 0: None
no message received by consumer
no message received by consumer
%4|1603348021.170|SESSTMOUT|rdkafka#consumer-1| [thrd:main]: Consumer group session timed out (in join-state started) after 10005 ms without a successful response from the group coordinator (broker 0, last error was Success): revoking assignment and rejoining group
no message received by consumer
no message received by consumer
no message received by consumer
%4|1603276138.721|MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (30000ms) exceeded by 7ms (adjust max.poll.interval.ms for long-running message processing): leaving group
error from consumer KafkaError{code=_MAX_POLL_EXCEEDED,val=-147,str="Application maximum poll interval (30000ms) exceeded by 7ms"}
from confluent_kafka import Consumer
def consume():
    c = Consumer({"bootstrap.servers": "localhost:9092", 
                  "group.id": "group1",
                  "enable.auto.commit": False,
                  "auto.offset.reset": "earliest",
                  "max.poll.interval.ms": 30000,
                  "session.timeout.ms": 10000,
                  "heartbeat.interval.ms": 15000
                  })
    c.subscribe(["first_topic"])
    while True:
        message = c.poll(1.0)
        if message is None:
            print("no message received by consumer")
        elif message.error() is not None:
            print(f"error from consumer {message.error()}")
        else:
            print(f"consumed message {message.key()}: {message.value().decode('utf-8')}: {message.offset()}: {message.topic()}: {message.partition()}: {message.headers()}")
        time.sleep(10)

心跳间隔.ms必须低于会话超时毫秒 https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#heartbeat.interval.ms

session.timeout.ms * 1/3

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka - 如何捕获kafka客户端后台线程生成的消息 的相关文章

  • Python:在列表理解本身中引用列表理解?

    这个想法刚刚出现在我的脑海中 假设您出于某种原因想要通过 Python 中的列表理解来获取列表的唯一元素 i if i in created comprehension else 0 for i in 1 2 1 2 3 1 2 0 0 3
  • 无法“安装”plpython3u - postgresql

    我正在尝试在 postgresql 中使用 python 语言 像这样的事情 create or replace function test a integer returns integer as if a 2 0 return even
  • 通过 Scrapy 抓取 Google Analytics

    我一直在尝试使用 Scrapy 从 Google Analytics 获取一些数据 尽管我是一个完全的 Python 新手 但我已经取得了一些进展 我现在可以通过 Scrapy 登录 Google Analytics 但我需要发出 AJAX
  • Python 中的 Lanczos 插值与 2D 图像

    我尝试重新缩放 2D 图像 灰度 图像大小为 256x256 所需输出为 224x224 像素值范围从 0 到 1300 我尝试了两种使用 Lanczos 插值来重新调整它们的方法 首先使用PIL图像 import numpy as np
  • 将数据从 python pandas 数据框导出或写入 MS Access 表

    我正在尝试将数据从 python pandas 数据框导出到现有的 MS Access 表 我想用已更新的数据替换 MS Access 表 在 python 中 我尝试使用 pandas to sql 但收到错误消息 我觉得很奇怪 使用 p
  • Django:按钮链接

    我是一名 Django 新手用户 尝试创建一个按钮 单击该按钮会链接到我网站中的另一个页面 我尝试了一些不同的例子 但似乎没有一个对我有用 举个例子 为什么这不起作用
  • 如何使用 Ansible playbook 中的 service_facts 模块检查服务是否存在且未安装在服务器中?

    我用过service facts检查服务是否正在运行并启用 在某些服务器中 未安装特定的软件包 现在 我如何知道这个特定的软件包没有安装在该特定的服务器上service facts module 在 Ansible 剧本中 它显示以下错误
  • 如何从网页中嵌入的 Tableau 图表中抓取工具提示值

    我试图弄清楚是否有一种方法以及如何使用 python 从网页中的 Tableau 嵌入图形中抓取工具提示值 以下是当用户将鼠标悬停在条形上时带有工具提示的图表示例 我从要从中抓取的原始网页中获取了此网址 https covid19 colo
  • 如何使用 OpencV 从 Firebase 读取图像?

    有没有使用 OpenCV 从 Firebase 读取图像的想法 或者我必须先下载图片 然后从本地文件夹执行 cv imread 功能 有什么办法我可以使用cv imread link of picture from firebase 您可以
  • Python 的“zip”内置函数的 Ruby 等价物是什么?

    Ruby 是否有与 Python 内置函数等效的东西zip功能 如果不是 做同样事情的简洁方法是什么 一些背景信息 当我试图找到一种干净的方法来进行涉及两个数组的检查时 出现了这个问题 如果我有zip 我可以写这样的东西 zip a b a
  • Fabric env.roledefs 未按预期运行

    On the 面料网站 http docs fabfile org en 1 10 usage execution html 给出这个例子 from fabric api import env env roledefs web hosts
  • 向 Altair 图表添加背景实心填充

    I like Altair a lot for making graphs in Python As a tribute I wanted to regenerate the Economist graph s in Mistakes we
  • 对年龄列进行分组/分类

    我有一个数据框说df有一个柱子 Ages gt gt gt df Age 0 22 1 38 2 26 3 35 4 35 5 1 6 54 我想对这个年龄段进行分组并创建一个像这样的新专栏 If age gt 0 age lt 2 the
  • 解释 Python 中的数字范围

    在 Pylons Web 应用程序中 我需要获取一个字符串 例如 关于如何做到这一点有什么建议吗 我是 Python 新手 我还没有找到任何可以帮助解决此类问题的东西 该列表将是 1 2 3 45 46 48 49 50 51 77 使用
  • 类型错误:预期单个张量时的张量列表 - 将 const 与 tf.random_normal 一起使用时

    我有以下 TensorFlow 代码 tf constant tf random normal time step batch size 1 1 我正进入 状态TypeError List of Tensors when single Te
  • Conda SafetyError:文件大小不正确

    使用创建 Conda 环境时conda create n env name python 3 6 我收到以下警告 Preparing transaction done Verifying transaction SafetyError Th
  • Scrapy:如何使用元在方法之间传递项目

    我是 scrapy 和 python 的新手 我试图将 parse quotes 中的项目 item author 传递给下一个解析方法 parse bio 我尝试了 request meta 和 response meta 方法 如 sc
  • Python 类继承 - 诡异的动作

    我观察到类继承有一个奇怪的效果 对于我正在处理的项目 我正在创建一个类来充当另一个模块的类的包装器 我正在使用第 3 方 aeidon 模块 用于操作字幕文件 但问题可能不太具体 以下是您通常如何使用该模块 project aeidon P
  • 导入错误:没有名为 site 的模块 - mac

    我已经有这个问题几个月了 每次我想获取一个新的 python 包并使用它时 我都会在终端中收到此错误 ImportError No module named site 我不知道为什么会出现这个错误 实际上 我无法使用任何新软件包 因为每次我
  • 如何将输入读取为数字?

    这个问题的答案是社区努力 help privileges edit community wiki 编辑现有答案以改进这篇文章 目前不接受新的答案或互动 Why are x and y下面的代码中使用字符串而不是整数 注意 在Python 2

随机推荐

  • 如何正确更新 OpenGL Es 2.0 中的顶点数组?

    当我在 OpenGL 2 0 中更新 iOS 上的顶点数组时 原始顶点数据保留在屏幕上 即第一个刷新是持久的 我发送到 GPU 的初始点集每帧都会渲染 但第二个刷新是持久的 我发送到 GPU 的初始点集每帧都会渲染 第 3 次 第 4 次
  • 守护进程在 JVM 垃圾收集器抖动和 JVM 内存耗尽后立即停止

    当我使用 gradle 6 0 构建多模块 java 项目时 当我添加 gt 30 个模块时 抛出此错误 Gradle Daemon started in 2 s 267 ms gt Configure project soa misc o
  • Android - Facebook 集成:无法导入 com.facebook.Session

    我是 Android Facebook 集成的新手 我正在尝试将我的应用程序与 Facebook 集成 因此我按照脸书教程 https developers facebook com docs android 并且一切正常 至少我能够执行登
  • RequireJS 模块/包的相对路径

    我对 RequireJS 还很陌生 并且遇到了一些问题 我使用 RequireJS 编写了一个基于 Backbone 构建的小框架 我希望它可以在不同的项目中重复使用 因此 通过一些搜索 我了解到 require 允许包 这似乎就是我正在寻
  • 带有变量的 jQuery 选择器

    如何将变量与选择器混合使用 我有 ID 变量 我想从 div one 中选择具有此 id 的图像 jQuery one img id 是选择器 我试过了 one img id 但不起作用 编辑 根据您下面的评论 您将使用此 one img
  • 调试时 Visual Studio 不会因未处理的异常而中断

    突然我的visual studio不会因未处理的异常而中断 有时甚至不会在断点处停止 我读过许多其他相关的 SO 帖子 如下所示 如何使 Visual Studio 仅在未处理的异常上中断 https stackoverflow com q
  • 数据类:如何使用 asdict() 忽略 None 值?

    dataclass class Car brand str color str 如何获得忽略 None 值的字典 就像是 gt gt gt car Car brand Audi color None gt gt gt asdict car
  • 在 R 中绘制 x 轴上包含日期的图表

    我正在尝试在 x 轴上绘制日期 间隔为 1 个月 并旋转日期值以确保清晰 r runif 100 d lt as Date 2001 1 1 70 sort r plot d r type l xaxt n axis Date 1 at s
  • 从 Julia 程序执行 >> shell 运算符

    我试图使用反引号从 Julia 内部附加一个文件 run cat file2 gt gt file1 但这行不通 似乎 gt gt 运算符无法正确解释 有没有办法通过管道或其他技巧来做到这一点 如果您尝试以编程方式执行此操作 则主要问题正文
  • Numpy:给定索引,如何以有效的方式消除沿 axis=1 的最小值?

    给定一个形状为 A 的矩阵 1000000 6 我已经弄清楚如何获取每行的最小最右边值并在此函数中实现它 def calculate row minima indices h h is the given matrix Returns th
  • Spring Boot - 非 Web 应用程序的长时间运行应用程序

    我有一个简单的 Spring Boot 应用程序 仅使用 AMQP 依赖项 仅 org springframework boot spring boot starter amqp 例如没有 Web 依赖项 因此 JAR 中不包含应用程序服务
  • Django 过滤器调用返回的列表的默认顺序是什么?

    简短的问题连接到 PostgreSQL 数据库时 Django 过滤器调用返回的列表的默认顺序是什么 背景我自己承认 我had在应用程序层做了一个糟糕的假设 即返回列表的顺序将是恒定的 即不使用 order by 我查询的项目列表不按字母顺
  • 自动化时的 PowerShell 和 Excel 问题

    我面临着一个奇怪的问题 当我运行这段代码时 Excel New Object Com Excel Application book Excel Workbooks Add threading thread CurrentThread Cur
  • SpringBoot Undertow:如何分派到工作线程

    我目前正在查看 springboot undertow 对我来说 不太清楚如何将传入的 http 请求分派到工作线程以阻止操作处理 看着班级Undertow 嵌入式 Servlet Container class 看起来没有办法实现这种行为
  • 如何从PDO PHP 中的prepare() 获取查询错误?

    st db gt prepare SELECT FROM c6ode 在上述情况下 如何检查查询的故意 mysql 错误 需要设置错误模式属性PDO ATTR ERRMODE to PDO ERRMODE EXCEPTION 因为您期望异常
  • C 中 scanf 函数的格式说明符中 %c 规范之前的空格

    当我之间不包含空格时 d and c格式字符串中的规范scanf 在以下程序中运行函数 并在运行时输入 4 h 则输出为 Integer 4 and Character 究竟如何可变 c 在这种情况下接受输入 如果我在之间包含空格 会有什么
  • 在Google搜索时如何从第一页获取图像?

    通常使用Google搜索城市后 右侧会出现维基百科页面的一部分 其中包含图像和地图 谁能告诉我如何访问该图像 我应该知道怎么下载 实际上 主图像 与右侧地图图像一起 很少来自维基百科 因此您无法使用维基百科 API 来获取它 如果您想访问实
  • 在 GridView 或 ListView 底部添加额外空间

    是否可以在 GridView 底部添加额外的空间 有点像空行 我希望当你向下滚动到 GridView 底部时 会有额外的 50dp 的空白空间 我尝试设置paddingBottom到50dp 但似乎没有改变任何东西 如果我理解正确的话 它应
  • Grails - SpringSecurityPlugin 不生成控制器

    我是 Grails 新手 我按照说明安装 SpringSecurityPlugin 版本 2 0 RC2 并执行命令 grails s2 quickstart 用户角色 应该在其他文件中生成 登录控制器 and 注销控制器 但这些控制器不会
  • Kafka - 如何捕获kafka客户端后台线程生成的消息

    使用以下配置来模拟消费者关闭 会话超时 我们如何捕获客户端记录到控制台的消息 SESSTMOUT rdkafka consumer 1 第三 主要 consumed message None msg1 0 first topic 0 Non