如何在Golang中创建kafka消费者组?

2024-04-09

可用的库是sarama https://github.com/Shopify/sarama/(或其扩展萨拉玛簇 https://github.com/bsm/sarama-cluster)但是没有提供消费者组示例,不在sarama https://godoc.org/gopkg.in/Shopify/sarama.v1#example-Consumer nor in 萨拉玛簇 https://github.com/bsm/sarama-cluster/issues/105.

我不明白 API。我可以举一个为某个主题创建消费者组的示例吗?


没有必要使用萨拉玛簇图书馆。对于 apache kafka 集成,它已被弃用。Sarama原始库本身提供了一种使用消费者组连接到kafka集群的方法。

我们需要创建客户端,然后初始化消费者组,在其中创建声明并等待消息通道接收消息。

初始化客户端:-

kfversion, err := sarama.ParseKafkaVersion(kafkaVersion) // kafkaVersion is the version of kafka server like 0.11.0.2
if err != nil {
    log.Println(err)
}

config := sarama.NewConfig()
config.Version = kfversion
config.Consumer.Return.Errors = true

// Start with a client
client, err := sarama.NewClient([]string{brokerAddr}, config)
if err != nil {
    log.Println(err)
}
defer func() { _ = client.Close() }()

与消费者组的连接:-

// Start a new consumer group
group, err := sarama.NewConsumerGroupFromClient(consumer_group, client)
if err != nil {
    log.Println(err)
}
defer func() { _ = group.Close() }()

开始使用主题分区中的消息:-

// Iterate over consumer sessions.
ctx := context.Background()
for {
    topics := []string{topicName}
    handler := &Message{}
    err := group.Consume(ctx, topics, handler)
    if err != nil {
        log.Println(err)
    }
}

最后一部分是等待消息通道消费消息。我们需要实现所有要实现的功能(三)ConsumerGroupHandler界面。

func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error   { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
        sess.MarkMessage(msg, "")
    }
    return nil
}

有关使用 golang 检查 kafka 的更多信息sarama https://godoc.org/github.com/Shopify/sarama图书馆。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在Golang中创建kafka消费者组? 的相关文章