我正在使用来自 eventhub 官方文档的示例代码,用于sending and 接收.
我有两个消费者群体:$Default and newcg。假设您有 2 个客户端,client_1 使用默认消费者组($Default),client_2 使用另一个消费者组(newcg)
首先,创建发送客户端后,在SendMessagesToEventHub
方法中,我们需要添加一个具有值的属性。该值应该是消费者组名称。示例代码如下:
private static async Task SendMessagesToEventHub(int numMessagesToSend)
{
for (var i = 0; i < numMessagesToSend; i++)
{
try
{
var message = "444 Message";
Console.WriteLine($"Sending message: {message}");
EventData mydata = new EventData(Encoding.UTF8.GetBytes(message));
//here, we add a property named "cg", it's value is the consumer group. By setting this property, then we can read this message via this specified consumer group.
mydata.Properties.Add("cg", "newcg");
await eventHubClient.SendAsync(mydata);
}
catch (Exception exception)
{
Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
}
await Task.Delay(10);
}
Console.WriteLine($"{numMessagesToSend} messages sent.");
}
然后在client_1中,创建接收器项目后,使用默认消费者组($Default)-> 在SimpleEventProcessor
类->ProcessEventsAsync
方法,我们可以过滤掉不必要的事件数据。示例代码为ProcessEventsAsync
method:
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
//filter the data here
if (eventData.Properties["cg"].ToString() == "$Default")
{
var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
Console.WriteLine(context.ConsumerGroupName);
}
}
return context.CheckpointAsync();
}
在另一个客户端中,例如 client_2,它使用另一个消费者组,就像它的名称一样newcg,我们可以按照client_1中的步骤进行,只需稍加改动ProcessEventsAsync
方法,如下:
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
//filter the data here, using another consumer group name
if (eventData.Properties["cg"].ToString() == "newcg")
{
//other code
}
}
return context.CheckpointAsync();
}