PyKafka Producer.get_delivery_report 当 block=false 时抛出 Queue.empty

2024-01-08

我目前正在使用 Python 进行 Kafka 集成,并且我是来自 PHP 背景的 Kafka 和 Python 新手。

我已经设法让生产者工作,但由于等待来自 Kafka 的确认,它处理每条消息的速度不够快。

在 GitHub 页面上(https://github.com/Parsely/pykafka https://github.com/Parsely/pykafka)下面的示例应该异步处理消息并且仍然允许发送报告:

>>> with topic.get_producer(delivery_reports=True) as producer:
...     count = 0
...     while True:
...         count += 1
...         producer.produce('test msg', partition_key='{}'.format(count))
...         if count % 10**5 == 0:  # adjust this or bring lots of RAM ;)
...             while True:
...                 try:
...                     msg, exc = producer.get_delivery_report(block=False)
...                     if exc is not None:
...                         print 'Failed to deliver msg {}: {}'.format(
...                             msg.partition_key, repr(exc))
...                     else:
...                         print 'Successfully delivered msg {}'.format(
...                         msg.partition_key)
...                 except Queue.Empty:
...                     break

我修改了示例,但是从测试中我可以看到第一条消息已成功发送,但抛出了 Queue.empty 异常。

这是我修改后的代码:

from pykafka import KafkaClient
import Queue
import json

client = KafkaClient(hosts='1.1.1.1:9092')
topic = client.topics['test']


sync = False
# sync = True

if sync:
    with topic.get_sync_producer() as producer:
        count = 0
        while True:
            count += 1
            producer.produce('Test message ' + str(count))
            print 'Sent message ' + str(count)
else:
    with topic.get_producer(delivery_reports=True) as producer:
        count = 0
        while True:
            count += 1
            if count >= 100:
                print 'Processed 100 messages'
                break
            producer.produce('Test message ' + str(count))
            while True:
                try:
                    msg, exc = producer.get_delivery_report(block=False)
                    if exc is not None:
                        print 'Failed to deliver msg {}: {}'.format(msg.offset, repr(exc))
                    else:
                        print 'Successfully delivered msg {}'.format(msg.offset)
                except Queue.Empty:
                    print 'Queue.empty'
                    break

和输出:

/Users/jim/Projects/kafka_test/env/bin/python /Users/jim/Projects/kafka_test/producer.py
Queue.empty
...
... x100
Processed 100 messages

通过检查我的消费者,我可以看到所有 100 条消息都已成功发送,但我无法从生产者那里得知这一点。

您对如何改进此实现有什么建议,更具体地说,如何在保持检查消息是否成功的能力的同时提高吞吐量?


我发现了一个与此相关的 GitHub 问题:https://github.com/Parsely/pykafka/issues/291 https://github.com/Parsely/pykafka/issues/291

我通过将 min_queued_messages 降低到 1 来修复此问题。

with topic.get_sync_producer(min_queued_messages=1) as producer:
        count = 0
        while True:
            count += 1
            time_start = time.time()
            producer.produce('Test message ' + str(count))
            time_end = time.time()

            print 'Sent message %d, %ss duration' % (count, (time_end - time_start))
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

PyKafka Producer.get_delivery_report 当 block=false 时抛出 Queue.empty 的相关文章

随机推荐