将消息定向给消费者

2023-12-11

我的客户端正在尝试向接收者发送消息。但是我注意到接收者有时没有收到客户端发送的所有消息,因此丢失了一些消息(不确定问题出在哪里?客户端还是接收者)。 关于为什么会发生这种情况的任何建议。这就是我目前正在做的事情

在接收方,这就是我正在做的事情。

这是事件处理器

        async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            foreach (var eventData in messages)
            {
                var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
            }
        }

这是客户端连接到事件中心的方式

var StrBuilder = new EventHubsConnectionStringBuilder(eventHubConnectionString)
{
 EntityPath = eventHubName,
};
this.eventHubClient = EventHubClient.CreateFromConnectionString(StrBuilder.ToString());

如何将消息定向给特定消费者


我正在使用来自 eventhub 官方文档的示例代码,用于sending and 接收.

我有两个消费者群体:$Default and newcg。假设您有 2 个客户端,client_1 使用默认消费者组($Default),client_2 使用另一个消费者组(newcg)

首先,创建发送客户端后,在SendMessagesToEventHub方法中,我们需要添加一个具有值的属性。该值应该是消费者组名称。示例代码如下:

    private static async Task SendMessagesToEventHub(int numMessagesToSend)
    {
        for (var i = 0; i < numMessagesToSend; i++)
        {
            try
            {
                var message = "444 Message";
                Console.WriteLine($"Sending message: {message}");
                EventData mydata = new EventData(Encoding.UTF8.GetBytes(message));

                //here, we add a property named "cg", it's value is the consumer group. By setting this property, then we can read this message via this specified consumer group.
                mydata.Properties.Add("cg", "newcg");

                await eventHubClient.SendAsync(mydata);

            }
            catch (Exception exception)
            {
                Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
            }

            await Task.Delay(10);
        }

        Console.WriteLine($"{numMessagesToSend} messages sent.");
    }

然后在client_1中,创建接收器项目后,使用默认消费者组($Default)-> 在SimpleEventProcessor类->ProcessEventsAsync方法,我们可以过滤掉不必要的事件数据。示例代码为ProcessEventsAsync method:

        public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            foreach (var eventData in messages)
            {
                //filter the data here
                if (eventData.Properties["cg"].ToString() == "$Default")
                {                    
                    var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);

                    Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
                    Console.WriteLine(context.ConsumerGroupName);
                }
            }

            return context.CheckpointAsync();
        }

在另一个客户端中,例如 client_2,它使用另一个消费者组,就像它的名称一样newcg,我们可以按照client_1中的步骤进行,只需稍加改动ProcessEventsAsync方法,如下:

            public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
            {
                foreach (var eventData in messages)
                {
                    //filter the data here, using another consumer group name
                    if (eventData.Properties["cg"].ToString() == "newcg")
                    {  
                       //other code
                    }
                   }

                 return context.CheckpointAsync();
               }
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将消息定向给消费者 的相关文章

  • 为什么opencv videowriter这么慢?

    你好 stackoverflow 社区 我有一个棘手的问题 我需要你的帮助来了解这里发生了什么 我的程序从视频采集卡 Blackmagic 捕获帧 到目前为止 它工作得很好 同时我用 opencv cv imshow 显示捕获的图像 它也工
  • 为什么派生类不使用基类的operator=(赋值运算符)?

    以下是实际问题的简化版本 而不是打电话Base operator int 代码似乎生成了一个临时的Derived对象并复制它 既然函数签名似乎完美匹配 为什么不使用基本赋值运算符 这个简化的示例没有显示任何不良影响 但原始代码在析构函数中有
  • 如何将pdf页面设置设置为打印属性对话框?

    大家好 我想知道如何设置 pdf 页面设置到打印属性对话框 例如 如果我的 PDF 页面设置为横向 则布局会自动显示横向而不是纵向 如果我的 PDF 页面设置为纵向 则布局会自动显示纵向 我在这个主题上做了很多研发 但没有找到任何满意的链接
  • 通过单个 GPIO 引脚转储闪存

    我正在使用 Infineon 的 XMC4500 Relax Kit 并尝试通过单个 GPIO 引脚提取固件 我非常天真的想法是通过 GPIO 引脚一次转储一位 然后用逻辑分析仪以某种方式 嗅探 数据 伪代码 while word by w
  • 如何使用汇编获取BIOS时间?

    我正在从头开始实现一个小型操作系统 用于教育目的 现在 我想使用汇编来获取 BIOS 时间 我对此进行了很多搜索 但找不到任何代码示例来执行此操作 如果有人可以提供任何参考或代码示例或与此相关的任何内容 我将非常感激 See 时钟中断 1a
  • 通过引用传递时取消引用指针

    当通过引用传递给函数时取消引用指针时会发生什么 这是一个简单的例子 int returnSame int example return example int main int inum 3 int pinum inum std cout
  • 为什么假设 send 可能返回的数据少于在阻塞套接字上传输的请求数据?

    在流套接字上发送数据的标准方法始终是调用 send 并写入一大块数据 检查返回值以查看是否发送了所有数据 然后再次调用 send 直到整个消息被接受 例如 这是一个常见方案的简单示例 int send all int sock unsign
  • C 中的模仿函数重写

    具体来说 函数重写能够调用基本重写方法 这有两部分 一个是预编译的库代码 1 另一个是库的用户代码 2 我在这里实现了一个尽可能最小的经典 Person 和 Employee 示例 非常感谢了解 OOP 概念的铁杆 C 开发人员的回应 我正
  • 使用 openssl 检查服务器安全协议

    我有一个框架应用程序 它根据使用方式连接到不同的服务器 对于 https 连接 使用 openssl 我的问题是 我需要知道我连接的服务器是否使用 SSL 还是 TLS 以便我可以创建正确的 SSL 上下文 目前 如果我使用错误的上下文尝试
  • 使用scanf()时如何区分整数和字符

    我只是使用该功能scanf 代码如下 scanf d a printf d a 当我输入1时 它会像我想要的那样打印1 但即使我输入 1a 它也会像以前一样打印 1 当用户输入非整数时 例如 2 3 12ab 1 a 我想向用户显示 输入整
  • QThread - 使用槽 quit() 退出线程

    我想在线程完成运行时通知对象 但是 我无法让线程正确退出 我有以下代码 处理器 cpp thread new QThread tw new ThreadWorker connect tw SIGNAL updateStatus QStrin
  • 从包含大量文件的目录中检索文件

    我的目录包含近 14 000 000 个 wav 格式的音频样本 所有普通存储 没有子目录 我想循环浏览文件 但是当我使用DirectoryInfo GetFiles 在该文件夹上 整个应用程序冻结了几分钟 可以用另一种方式完成吗 也许读取
  • 为什么WCF中不允许方法重载?

    假设这是一个ServiceContract ServiceContract public interface MyService OperationContract int Sum int x int y OperationContract
  • 当在 Repository/UnitOrWork 之上使用 Service 类时,我应该在哪里放置逻辑不适合 Repository 的常用数据访问代码?

    In my 先前的问题 https stackoverflow com questions 24906548 using the generic repository unit of work pattern in large projec
  • 无法通过 LINQ to Entities 使用某些功能?

    我正在尝试使用 LINQ 查询在项目上实现搜索功能 由于数据有时包含带有重音符号和其他符号的字符 因此我创建了一种方法来删除这些字符以进行搜索 这是我的代码 var addresses from a in db Addresses join
  • 为什么C语言中可以使用多个分号?

    在 C 中我可以执行以下操作 int main printf HELLO WORLD 它有效 这是为什么 我个人的想法 分号是一个 NO OPERATION 来自维基百科 指示符 拥有一大串分号与拥有一个分号并告诉 C 语句已结束具有相同的
  • c# 替代方案中 cfusion_encrypt 中填充的密钥是什么?

    我找到了从这里复制 C 中的 cfusion encrypt 函数的答案 ColdFusion cfusion encrypt 和 cfusion decrypt C 替代方案 https stackoverflow com questio
  • 在何处将 CFLAG(例如 -std=gnu99)添加到 (Eclipse CDT) 自动工具项目中

    我有一个简单的 Autotools C 项目 不是 C 其框架是由 Eclipse CDT Juno 为我创建的 CFLAG 通过检查 似乎是 g O2 我希望所有生成的 make 文件也具有 std gnu99附加到 CFLAG 因为我使
  • 通过 cmake 链接作为外部项目包含的 opencv 库[重复]

    这个问题在这里已经有答案了 我对 cmake 比较陌生 经过几天的努力无法弄清楚以下事情 我有一个依赖于 opencv 的项目 它本身就是一个 cmake 项目 我想静态链接 opencv 库 我正在做的是我的项目中有一份 opencv 源
  • 使用 python 将 CSV 文件上传到 Microsoft Azure 存储帐户

    我正在尝试上传一个 csv使用 python 将文件写入 Microsoft Azure 存储帐户 我已经发现C sharp https blogs msdn microsoft com jmstall 2012 08 03 convert

随机推荐

  • 如何使用 Google Apps 脚本将表格置于 Google 文档页面的中心

    我已使用 Google Apps 脚本功能在 google 文档中插入了一个表格 var grg body appendTable griglia 我可以设置表格中文本的格式 字体大小 粗细 对齐方式 也可以设置单个单元格的格式 背景 前景
  • 我应该如何使用 HttpRequest.GetBufferlessInputStream?

    我在 WCF 服务中接收发布数据时遇到问题 如果我尝试使用 InputStream 则会出现异常 调用 HttpRequest GetBufferlessInputStream 后不支持此方法或属性 我相信我明白为什么会抛出这个错误 但我还
  • OpenGL 中的厚贝塞尔曲线

    我正在使用 jogl opengl 绑定在 java 中编写一个程序 我需要创建一条厚度沿曲线变化的贝塞尔曲线 到目前为止 我只管理了一条细的单点贝塞尔曲线 我很确定这不是一件容易的事 但我不知道从哪里开始寻找解决方案 如果有人能指出我如何
  • 当屏幕关闭时,MediaPlayer 在 Lollipop 上过早切断播放

    我在 Lollipop 设备上遇到了 MediaPlayer 的问题 基本上 当设备屏幕关闭 即用户锁定设备 时 播放会继续 但提前结束约 1 2 秒 但屏幕打开时不会发生这种情况 我在 MediaPlayer 上有一个 onComplet
  • 计算素数时堆栈空间溢出

    我正在学习 Real World Haskell 我在第 4 章 为了进行一些课外练习 我创建了以下程序来计算第 n 个素数 import System Environment isPrime primes test loop primes
  • Android 2.1:如何在 GridView 上放大/缩小和滚动

    背景 我的工作应用程序包含一个 GridView 它有 5 行 11 列 并带有一个用于显示的覆盖适配器 它非常适合我对大显示屏平板电脑的需求 移植到小型智能手机后 我意识到网格由于尺寸小而无法使用 我决定使用缩放功能 而不是实现横向 问题
  • 当使用 tcp 套接字执行 async_write 时,何时调用处理程序?

    这只是 async write 如何与 tcp 套接字配合的简单问题 基本上 当使用 tcp 套接字时 当数据写入套接字时 或者从目标接收到 ack 时 写入处理程序是否会被调用 AFAIK 一旦数据写入套接字的内核缓冲区 处理程序就会被调
  • Access 2007 SQL 中的 Group By 聚合函数中的不同计数

    您好 我浏览论坛有一段时间了 在这里问我的第一个问题 我有点陷入困境 想知道是否可以获得一些帮助 我正在使用 Access 2007 尚未在网上找到该问题的良好答案 我的数据是诊断代码和客户 ID 我正在寻找的是为什么要查找每个诊断代码的客
  • Django 表单未提交

    我有一个在模板中正确呈现的 Django 模型 视图 表单 但它没有提交输入到数据库的数据 任何对此的帮助将不胜感激 models py from django db import models from django forms impo
  • 汇编程序可以在 Linux 发行版之间移植吗?

    以汇编程序格式提供的程序是否可以在 Linux 发行版之间移植 模 CPU 架构差异 这是我的问题的背景 我正在开发一种新的编程语言 名为 Aklo 其操作方式将是经典的编译为 s 并将结果提供给 GNU 汇编器 显然最终最好能自己编写实现
  • 如何在javascript中创建txt文件

    if window XMLHttpRequest xmlhttp new XMLHttpRequest else xmlhttp new ActiveXObject Microsoft XMLHTTP xmlhttp open GET t1
  • 如何访问动态列表中的项目?

    我试图弄清楚如何枚举动态 LINQ 的结果 Select string selectors 在 NET 4 5 中 动态 linq 来自System Linq Dynamic命名空间 Edit 我还包括System Linq 我有一个看起来
  • 双向 WeakMap 保持对象存活?

    假设我有两个 WeakMap a2b new WeakMap
  • java.lang.NoSuchMethodError: org.hibernate.cfg.Configuration.addAnnotatedClass

    当我尝试这个时 我是 JPA 和 hibernate 的新手tutorial 我在 persistence xml 中添加了以下提供程序
  • 类型不匹配无法从元素类型对象转换为字符串

    在我的代码中创建搜索方法来搜索字符串时 我不断收到此错误 我已经通过很多例子试图解决这个问题 但我找不到任何例子 感谢您提供的任何帮助和建议 public class runNote public static void main Stri
  • 尝试针对 ManagementObjectNotFoundException 和 ActiveDirectory/Outlook 进行 Catch/Exception

    这可能是一个非常基本的问题 但我还没有在表格上看到它 请耐心等待 我是 powershell 新手 当我们的 Active Directory 数据库中找不到用户名时 我试图捕获此异常 ManagementObjectNotFoundExc
  • 如何使用 Selenium 允许位置访问?

    我试图在Java中使用Selenium来获取用户的地理坐标 但是使用IP地址不够准确 所以我想使用这个网站http www whataremycooperatives com 但它不起作用 我猜这是因为你必须允许位置使用 所以无论如何我可以
  • 是什么导致了“Base-64 字符数组的长度无效”

    我在这里没什么可说的 我无法在本地重现此问题 但是当用户收到错误时 我会收到自动电子邮件异常通知 Invalid length for a Base 64 char array at System Convert FromBase64Str
  • 为什么spring-boot-starter项目的github项目是空的?

    看着spring boot 启动器 web spring boot starter 安全性github 上的项目 我发现它们是空的 只有一个 build gradle 文件存在 我希望这符合预期 但这让我了解在哪里可以找到实际的源代码 而且
  • 将消息定向给消费者

    我的客户端正在尝试向接收者发送消息 但是我注意到接收者有时没有收到客户端发送的所有消息 因此丢失了一些消息 不确定问题出在哪里 客户端还是接收者 关于为什么会发生这种情况的任何建议 这就是我目前正在做的事情 在接收方 这就是我正在做的事情