使用以下配置来模拟消费者关闭/会话超时。我们如何捕获客户端记录到控制台的消息 - SESSTMOUT|rdkafka#consumer-1| [第三:主要]
consumed message None: msg1: 0: first_topic: 0: None
consumed message None: msg2: 1: first_topic: 0: None
no message received by consumer
no message received by consumer
%4|1603348021.170|SESSTMOUT|rdkafka#consumer-1| [thrd:main]: Consumer group session timed out (in join-state started) after 10005 ms without a successful response from the group coordinator (broker 0, last error was Success): revoking assignment and rejoining group
no message received by consumer
no message received by consumer
no message received by consumer
%4|1603276138.721|MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (30000ms) exceeded by 7ms (adjust max.poll.interval.ms for long-running message processing): leaving group
error from consumer KafkaError{code=_MAX_POLL_EXCEEDED,val=-147,str="Application maximum poll interval (30000ms) exceeded by 7ms"}
from confluent_kafka import Consumer
def consume():
c = Consumer({"bootstrap.servers": "localhost:9092",
"group.id": "group1",
"enable.auto.commit": False,
"auto.offset.reset": "earliest",
"max.poll.interval.ms": 30000,
"session.timeout.ms": 10000,
"heartbeat.interval.ms": 15000
})
c.subscribe(["first_topic"])
while True:
message = c.poll(1.0)
if message is None:
print("no message received by consumer")
elif message.error() is not None:
print(f"error from consumer {message.error()}")
else:
print(f"consumed message {message.key()}: {message.value().decode('utf-8')}: {message.offset()}: {message.topic()}: {message.partition()}: {message.headers()}")
time.sleep(10)
心跳间隔.ms必须低于会话超时毫秒 https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#heartbeat.interval.ms
session.timeout.ms * 1/3
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)