我正在尝试在 kafka-console- Producer 上启用“幂等”选项。
参考以下链接:
- https://gerardnico.com/dit/kafka/ Producer#idempot https://gerardnico.com/dit/kafka/producer
- https://gerardnico.com/dit/kafka/kafka-console- Producer https://gerardnico.com/dit/kafka/kafka-console-producer
使用的命令:
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list node1.com:6667 --topic my_topic --security-protocol SASL_PLAINTEXT --producer-property acks=all --producer-property retries=Integer.MAX_VALUE --producer-property enable.idempotence=true
观察到以下异常:
org.apache.kafka.common.KafkaException:构建kafka失败
制片人
在org.apache.kafka.clients. Producer.KafkaProducer。(KafkaProducer.java:433)
在org.apache.kafka.clients. Producer.KafkaProducer。(KafkaProducer.java:291)
在 kafka.生产者.NewShinyProducer。(BaseProducer.scala:40)
在 kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:50)
在 kafka.tools.ConsoleProducer.main(ConsoleProducer.scala) 引起:org.apache.kafka.common.config.ConfigException:必须设置
确认所有内容以便使用幂等生产者。否则我们
不能保证幂等性。
在 org.apache.kafka.clients. Producer.KafkaProducer.configureAcks(KafkaProducer.java:510)
在org.apache.kafka.clients. Producer.KafkaProducer。(KafkaProducer.java:375)
尽管 acks 已设置为“all”,但我们观察到此异常。
我缺少什么?
以下是使用的版本:
- 经纪人 - 1.0.0
- client - 与broker 1.0.0捆绑的控制台生产者
Update
我可以使用以下命令在控制台生成器上启用幂等性--request-required-acks -1
回复中建议的选项。
但是,我收到 ClusterAuthorizationException。
bash$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list borker1:6667 --topic my_topic --producer-property enable.idempotence=true --request-required-acks -1 --security-protocol SASL_PLAINTEXT --property "parse.key=true" --property "key.separator=:"
>key1:value1
>[2018-12-26 04:00:56,074] ERROR [Producer clientId=console-producer] Aborting producer batches due to fatal error (org.apache.kafka.clients.producer.internals.Sender)
org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
[2018-12-26 04:00:56,080] ERROR Error when sending message to topic orm_c1_prv_non_sepa_ci with key: 4 bytes, value: 6 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
仅当启用幂等选项时才会发生此异常。没有此选项也可以生成消息。
bash$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list broker1:6667 --topic my_topic --security-protocol SASL_PLAINTEXT --property "parse.key=true" --property "key.separator=:"
>key1:value1
>key2:value2
我缺少什么?