我目前正在使用 Python 进行 Kafka 集成,并且我是来自 PHP 背景的 Kafka 和 Python 新手。
我已经设法让生产者工作,但由于等待来自 Kafka 的确认,它处理每条消息的速度不够快。
在 GitHub 页面上(https://github.com/Parsely/pykafka https://github.com/Parsely/pykafka)下面的示例应该异步处理消息并且仍然允许发送报告:
>>> with topic.get_producer(delivery_reports=True) as producer:
... count = 0
... while True:
... count += 1
... producer.produce('test msg', partition_key='{}'.format(count))
... if count % 10**5 == 0: # adjust this or bring lots of RAM ;)
... while True:
... try:
... msg, exc = producer.get_delivery_report(block=False)
... if exc is not None:
... print 'Failed to deliver msg {}: {}'.format(
... msg.partition_key, repr(exc))
... else:
... print 'Successfully delivered msg {}'.format(
... msg.partition_key)
... except Queue.Empty:
... break
我修改了示例,但是从测试中我可以看到第一条消息已成功发送,但抛出了 Queue.empty 异常。
这是我修改后的代码:
from pykafka import KafkaClient
import Queue
import json
client = KafkaClient(hosts='1.1.1.1:9092')
topic = client.topics['test']
sync = False
# sync = True
if sync:
with topic.get_sync_producer() as producer:
count = 0
while True:
count += 1
producer.produce('Test message ' + str(count))
print 'Sent message ' + str(count)
else:
with topic.get_producer(delivery_reports=True) as producer:
count = 0
while True:
count += 1
if count >= 100:
print 'Processed 100 messages'
break
producer.produce('Test message ' + str(count))
while True:
try:
msg, exc = producer.get_delivery_report(block=False)
if exc is not None:
print 'Failed to deliver msg {}: {}'.format(msg.offset, repr(exc))
else:
print 'Successfully delivered msg {}'.format(msg.offset)
except Queue.Empty:
print 'Queue.empty'
break
和输出:
/Users/jim/Projects/kafka_test/env/bin/python /Users/jim/Projects/kafka_test/producer.py
Queue.empty
...
... x100
Processed 100 messages
通过检查我的消费者,我可以看到所有 100 条消息都已成功发送,但我无法从生产者那里得知这一点。
您对如何改进此实现有什么建议,更具体地说,如何在保持检查消息是否成功的能力的同时提高吞吐量?