Kafka Producer:使用回调处理异步发送中的异常

2024-01-13

我需要捕获异步发送到 Kafka 时的异常。 Kafka生产者API带有一个函数send(ProducerRecord record, Callback回调)。但是当我针对以下两种情况进行测试时:

  • 卡夫卡经纪人宕机
  • 主题未预先创建 回调没有被调用。相反,我在代码中收到发送失败的警告(如下所示)。

问题 :

  • 那么回调函数是否仅针对特定异常调用?

  • 异步发送时,Kafka 客户端何时尝试连接到 Kafka 代理:在每批发送或定期发送?

Kafka 警告图像 https://i.stack.imgur.com/t8R39.png

注意:我还使用 linger.ms 设置 25 秒来批量发送我的记录。


public class ProducerDemo {

    static KafkaProducer<String, String> producer;

    public static void main(String[] args) throws IOException {

         final Logger logger = LoggerFactory.getLogger(ProducerDemo.class);
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "30000");

        producer = new KafkaProducer<String, String>(properties);
        String topic = "first_topic";

        for (int i = 0; i < 5; i++) {
            String value = "hello world " + Integer.toString(i);
            String key = "id_" + Integer.toString(i);

            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);

              producer.send(record, new Callback() {
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        //execute everytime a record is successfully sent or exception is thrown
                        if(e == null){
                           // No Exception
                        }else{
                            //Exception Handling
                        }
                    }
                });
        }
        producer.close();
    }

You will get those warning for non-existing topic as a resilience mechanism provided with KafkaProducer. If you wait a bit longer(should be 60 seconds by default), the callback will be called eventually: Here's my snippet: enter image description here

So, when something goes wrong and async send is not successful, it will eventually fail with a failed future or/and a callback with exception. If you are not running it transactionally, it can still mean that some messages from the batch have found their way to the broker, while others haven't. It will most certainly be a problem if you need a blocking-style acknowledgement to the upstream system(like http ingestion interface, etc.) per every message that is sent to Kafka. The only way to do that is by blocking every message with the future's get, as described in the documentation https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html: enter image description here

总的来说,我注意到很多与 KafkaProducer 交付语义和保证相关的问题。绝对可以更好地记录它。

还有一件事,既然你提到了徘徊者 https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html:

请注意,及时到达的记录通常会 即使 linger.ms=0 也可以一起批处理,因此在重负载下批处理将 无论延迟配置如何都会发生

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka Producer:使用回调处理异步发送中的异常 的相关文章

随机推荐