Rabbitmq - 连接/通道/消费者的恢复

2023-12-22

我正在创建一个以无限循环运行的消费者,以从队列中读取消息。我正在寻找有关如何在无限循环中恢复 abd continue 的建议/示例代码,即使存在网络中断也是如此。消费者必须保持运行,因为它将作为 WindowsService 安装。

1)有人可以解释一下如何正确使用这些设置吗?它们之间有什么区别?

NetworkRecoveryInterval 
AutomaticRecoveryEnabled
RequestedHeartbeat

2)请参阅我当前为消费者提供的示例代码。我正在使用 .Net RabbitMQ 客户端 v3.5.6。

上述设置将如何为我进行“恢复”? 例如Consumer.Queue.Dequeue 会阻塞直到恢复吗? 这似乎不对 所以...

我必须手动为此编写代码吗?例如Consumer.Queue.Dequeue 是否会抛出异常,我必须检测并手动重新创建我的连接、通道和消费者?或者只是消费者,因为“自动恢复”将为我恢复频道?

这是否意味着我应该将消费者创建移到 while 循环内?频道创建怎么样?以及连接的创建?

3)假设我必须手动执行一些恢复代码,是否有事件回调(以及如何注册它们)来告诉我存在网络问题?

Thanks!

public void StartConsumer(string queue)
{
            using (IModel channel = this.Connection.CreateModel())
            {
                var consumer = new QueueingBasicConsumer(channel);
                const bool noAck = false;
                channel.BasicConsume(queue, noAck, consumer);

                // do I need these conditions? or should I just do while(true)???
                while (channel.IsOpen &&        
                       Connection.IsOpen &&     
                       consumer.IsRunning)
                {
                    try
                    {
                        BasicDeliverEventArgs item;
                        if (consumer.Queue.Dequeue(Timeout, out item))
                        {
                            string message = System.Text.Encoding.UTF8.GetString(item.Body);
                            DoSomethingMethod(message);
                            channel.BasicAck(item.DeliveryTag, false);
                        }
                    }
                    catch (EndOfStreamException ex)
                    {   
                        // this is likely due to some connection issue -- what am I to do?
                    }
                    catch (Exception ex)
                    {   
                        // should never happen, but lets say my DoSomethingMethod(message); throws an exception
                        // presumably, I'll just log the error and keep on going
                    }
                }
            }
}

        public IConnection Connection
        {
            get
            {
                if (_connection == null) // _connection defined in class -- private static IConnection _connection;
                {
                     _connection = CreateConnection();
                }
                return _connection;
            }
        }

        private IConnection CreateConnection()
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "RabbitMqHostName",
                UserName = "RabbitMqUserName",
                Password = "RabbitMqPassword",
            };

            // why do we need to set this explicitly? shouldn't this be the default?
            factory.AutomaticRecoveryEnabled = true;

            // what is a good value to use?
            factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(5); 

            // what is a good value to use? How is this different from NetworkRecoveryInterval?
            factory.RequestedHeartbeat = 5; 

            IConnection connection = factory.CreateConnection();
            return connection;
        }

RabbitMQ 功能

The RabbitMQ 网站上的文档 https://www.rabbitmq.com/dotnet-api-guide.html其实真的很好。如果你想恢复队列、交换器和消费者,你需要寻找拓扑恢复,默认情况下启用。自动恢复(即默认启用 https://github.com/rabbitmq/rabbitmq-dotnet-client/blob/master/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs) 包括:

  • 重新连接
  • 恢复连接监听器
  • 重新开放渠道
  • 恢复通道侦听器
  • 恢复频道basic.qos设置、发布者确认和交易设置

The NetworkRecoveryInterval是执行自动恢复重试之前的时间量(默认为 5 秒)。

Heartbeat还有另一个目的,即识别失效的TCP连接。有更多内容请阅读 https://www.rabbitmq.com/heartbeats.html在 RabbitMQ 的网站上。

代码示例

编写可靠的恢复代码是很棘手的。这EndOfStreamException(正如您怀疑的那样)最有可能是由于网络问题。如果您使用管理插件 https://www.rabbitmq.com/management.html,您可以通过从那里关闭连接来重现此情况,并查看是否触发了异常。对于类似生产的应用程序,您可能需要一组代理,以便在连接失败时在它们之间轮流使用。如果您有多个 RabbitMQ 代理,您可能还希望防止一台或多台服务器上出现长期服务器故障。您可能想要实施错误策略,例如重新排队消息或使用死信交换。

我一直在思考这些事情并编写了一个瘦客户端,生兔肉 https://github.com/pardahlman/RawRabbit,处理其中一些事情。也许这对你有好处?如果没有的话我建议你改变一下QueueingBasicConsumer to an EventingBasicConsumer。它是事件驱动的,而不是线程阻塞的。

var eventConsumer = new EventingBasicConsumer(channel);
eventConsumer.Received += (sender, args) =>
{
    var body = args.Body;
    eventConsumer.Model.BasicAck(args.DeliveryTag, false);
};
channel.BasicConsume(queue, false, eventConsumer);

如果您激活了拓扑恢复,消费者将由 RabbitMQ 客户端恢复并再次开始接收消息。 为了进行更精细的控制,请连接事件处理程序ConsumerCancelled and Shutdown检测连接问题并Registered了解消费者何时可以再次使用。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Rabbitmq - 连接/通道/消费者的恢复 的相关文章

  • asp:repeater 折叠表行 - 已更新

    我想知道是否有人对我的问题有创造性的解决方案 我有一个从我的数据库填充的转发器 如下所示
  • Qt 和 Sqlite 示例

    我正在寻找一些使用 Qt 的示例代码 它是带有 Sqlite 驱动程序的 SQL 模块 我需要示例的主要原因是我之前有 Qt 数据库接口的经验 并且 Sqlite 在字段类型方面有一些奇怪的行为 类型是按字段存储的 而不是按列存储的 The
  • C++ STL 映射,std::pair 作为键

    这就是我通过地图定义的方式 std map
  • StackExchange Redis 删除所有以以下开头的键

    我有一个格式的密钥 Error 1 Error 24 Error 32 Using StackExchange Redis 我该怎么办KeyDelete在与格式匹配的所有键上Error 在另一个答案中我看到了 LUA 脚本 EVAL ret
  • MVVM:来自 FileOpenPicker 的图像绑定源

    我将 OnActivated 添加到 app xaml cs 中 它可以正常工作 protected async override void OnActivated IActivatedEventArgs args var continua
  • 如何将 dll 中包含的组件嵌入到 exe 中,以便它可以从内存运行?

    我正在尝试制作一个必须从内存运行的程序 通过Assembly Load bin 如上所述here http www codeproject com Articles 13897 Load an EXE File and Run It fro
  • 为什么 xcode IDE 认为 `friend` 是保留字

    我一直在开发一个个人项目 并在我创建的新类中包含以下代码 property readonly getter isFriend BOOL friend 它似乎没有任何问题 当我构建它时 它可以编译得很好 但是当我们在xcode IDE看起来像
  • 仅使用一个 #include 表达式一次包含多个头文件?

    是否有任何表达式可以使语法一次包含多个标头 而无需为每个新文件编写 include 表达式 例如 include
  • 在 C++ 中,为什么 const 也可以工作时编译器选择非常量函数? [复制]

    这个问题在这里已经有答案了 例如 假设我有一堂课 class Foo public std string Name m maybe modified true return m name const std string Name cons
  • Qt 多重继承和信号

    由于 QObject 我在 QT 中遇到了有关多重继承的问题 我知道很多人也有同样的问题 但我不知道该如何解决 class NavigatableItem public QObject Q OBJECT signals void desel
  • 对象变空似乎是 Hangfire 中的反序列化问题

    Hangfire 似乎无法反序列化我的原始版本Scheduler对象及其所有状态 我正在调用其 Execute 方法BackgroundJob Enqueue 如下所示 Scheduler new FileInFileOut FileIn
  • 向客户端发送状态码 500 时页面未呈现

    我有一个页面 通用处理程序 我想在该页面上向客户端返回状态代码 500 以指示出现问题 我这样做 Response StatusCode 500 Response StatusDescription Internal Server Erro
  • 批量插入,asp.net

    我需要获取与会员相对应的 ID 号列表 在任何给定时间处理的数量可能在 10 到 10 000 之间 我可以毫无问题地收集数据 解析数据并将其加载到 DataTable 或任何内容 C 中 但我想在数据库中执行一些操作 将所有这些数据插入表
  • 如何通过分解 y 轴来减小 mschart 的高度

    如何降低 mschart 的高度 如下所示 编辑 就我而言 我不想查看中断图表 this chart1 ChartAreas 0 AxisY ScaleBreakStyle Enabled false 您似乎正在寻找AxisY ScaleB
  • Qt - 添加超链接到对话框

    有没有办法在 Qt 对话框中添加可点击的超链接 IE 它应该看起来像一个超链接 蓝色文本 当您单击它时 它应该在浏览器中打开该超链接 像这样的东西 Use QLabel setOpenExternalLinks bool 并在标签上设置文本
  • ArrayList 有什么问题?

    最近我问了一个关于 SO 的问题 其中提到了可能使用 c ArrayList 来解决问题 有人评论说使用数组列表不好 我想了解更多有关此的信息 我以前从未听说过关于数组列表的这种说法 有人可以带我了解使用数组列表可能出现的性能问题吗 C n
  • Microsoft Visual Studio 2017 中的 wxWidgets 设置

    我花了大约 20 个小时试图弄清楚如何在 Microsoft Visual Studio 2017 中设置 wxWidgets 我遵循 https wiki wxwidgets org Microsoft Visual C 2B 2B Gu
  • TCP/IP 传输期间套接字数据损坏

    当我通过预连接的 TCP IP 套接字发送数据时 我发现数据已损坏 Example Station1 正在向 Station2 发送数据 我已经在发送之前 在 S1 和接收之后 在 S2 打印了数据 以下是消息 S1 发送的数据是ACKS2
  • 使用属性和性能

    我正在优化我的代码 我注意到使用属性 甚至自动属性 对执行时间有深远的影响 请参阅下面的示例 Test public void GetterVsField PropertyTest propertyTest new PropertyTest
  • 将二进制长字符串转换为十六进制 C#

    我正在寻找一种将长二进制字符串转换为十六进制字符串的方法 二进制字符串看起来像这样 0110011010010111001001110101011100110100001101101000011001010110001101101011 我

随机推荐