您不会显式创建消费者组,而是构建始终属于某个消费者组的消费者。无论您使用哪种技术(Spark、Spring、Flink...),每个 Kafka Consumer 都会有一个 Consumer Group。消费者组可以为每个单独的消费者进行配置。
似乎它在某个时间点加载conf/consumer.properties,此外,它在通过 kafka-console-consumer.sh 连接时隐式创建消费者组(在我的例子中是 console-consumer-67807)
如果您没有告诉您的控制台用户实际使用该文件,则不会考虑该文件。
有以下几种方法可以提供消费者组的名称:
带有属性文件的控制台 Consumer (--consumer.config)
文件是这样的config/consumer.properties
应该看起来像
# consumer group id
group.id=my-created-consumer-group
这就是你如何确保控制台消费者接受这个group.id
考虑到:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --consumer.config /path/to/config/consumer.properties
使用 --group 控制台消费者
对于控制台消费者,消费者组会自动创建,带有前缀“console-consumer”和后缀(例如 PID),除非您通过添加来提供自己的消费者组--group
:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --group my-created-consumer-group
基于标准代码的消费者 API
当使用标准 JAVA/Scala/... Consumer API 时,您可以通过以下属性提供 Consumer Group:
Properties settings = new Properties();
settings.put(ConsumerConfig.GROUP_ID_CONFIG, "basic-consumer");
// set more properties
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
consumer.subscribe(Arrays.asList("test-topic")