我需要捕获异步发送到 Kafka 时的异常。 Kafka生产者API带有一个函数send(ProducerRecord record, Callback回调)。但是当我针对以下两种情况进行测试时:
- 卡夫卡经纪人宕机
- 主题未预先创建
回调没有被调用。相反,我在代码中收到发送失败的警告(如下所示)。
问题 :
Kafka 警告图像 https://i.stack.imgur.com/t8R39.png
注意:我还使用 linger.ms 设置 25 秒来批量发送我的记录。
public class ProducerDemo {
static KafkaProducer<String, String> producer;
public static void main(String[] args) throws IOException {
final Logger logger = LoggerFactory.getLogger(ProducerDemo.class);
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "30000");
producer = new KafkaProducer<String, String>(properties);
String topic = "first_topic";
for (int i = 0; i < 5; i++) {
String value = "hello world " + Integer.toString(i);
String key = "id_" + Integer.toString(i);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//execute everytime a record is successfully sent or exception is thrown
if(e == null){
// No Exception
}else{
//Exception Handling
}
}
});
}
producer.close();
}
You will get those warning for non-existing topic as a resilience mechanism provided with KafkaProducer
. If you wait a bit longer(should be 60 seconds by default), the callback will be called eventually:
Here's my snippet:
So, when something goes wrong and async send is not successful, it will eventually fail with a failed future or/and a callback with exception.
If you are not running it transactionally, it can still mean that some messages from the batch have found their way to the broker, while others haven't.
It will most certainly be a problem if you need a blocking-style acknowledgement to the upstream system(like http ingestion interface, etc.) per every message that is sent to Kafka. The only way to do that is by blocking every message with the future's get
, as described in the documentation https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html:
总的来说,我注意到很多与 KafkaProducer 交付语义和保证相关的问题。绝对可以更好地记录它。
还有一件事,既然你提到了徘徊者 https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html:
请注意,及时到达的记录通常会
即使 linger.ms=0 也可以一起批处理,因此在重负载下批处理将
无论延迟配置如何都会发生
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)