C# 对RabbitMQ使用

2023-05-16

1、安装NuGet包RabbitMQ.Client

 2    生产者-确认机制   

(1). 含义:就是应答模式,生产者发送一条消息之后,Rabbitmq服务器做了个响应,表示收到了。

(2). 特点:异步模式,在响应之前,可以继续发送消息,单条消息、批量消息均可继续发送。

(3). 核心代码:单条消息确认: channel.waitForConfirms()

         批量消息确认: channel.waitForConfirmsOrDie()

大致流程:channel.ConfirmSelect();  开启确认模式→发送消息→提供一个回执方法WaitForConfirms();   返回一个bool 值

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";//RabbitMQ服务器地址
factory.DispatchConsumersAsync = true;//支持异步发送消息
string exchangeName = "exchange1";//交换机的名字
string eventName = "myEvent";// routingKey的值
using var conn = factory.CreateConnection();
while (true)
{
    string msg = DateTime.Now.TimeOfDay.ToString();//待发送消息
    using (var channel = conn.CreateModel())//创建信道
    {
        try
        {
            var properties = channel.CreateBasicProperties();
            properties.DeliveryMode = 2;//1非持久化、2是持久化


            //交换机 Name:交换机名称。
            // Type: 交换机类型——direct、topic、fanout、headers、sharding
            //Durable:消息代理重启后,交换机是否还存在。
            //Auto-delete :当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它。
            //Arguments:依赖代理本身。
            channel.ExchangeDeclare(exchange: exchangeName, type: "direct", true, false);//声明交换机

            byte[] body = Encoding.UTF8.GetBytes(msg);


            channel.ConfirmSelect();//开启消息确认模式

            //发布消息    
            //exchange:交换机名称
            //mandatory为true时,表示如果消息没有被正确路由,消息将退回消息的生产者 如果设置为false,那么broker端自动删除该消息。
            //routingKey:路由键
            //props:消息属性字段,比如消息头部信息等等
            //body:消息主体部分
            channel.BasicPublish(exchange: exchangeName, routingKey: eventName,
            mandatory: true, basicProperties: properties, body: body);
            Console.WriteLine("发布了消息:" + msg);



            /*首先开启Confirm模式,通知消息生产者成功推送到RabbitMQ中*/
            if (channel.WaitForConfirms())  //单条消息确认
            {
                //表示消息发送成功(已经存入队列)
                Console.WriteLine($"【{msg}】成功发送到RabbitMQ!");
            }
            else
            {
                Console.WriteLine($"【{msg}】发送到RabbitMQ失败!");
            }
            //channel.WaitForConfirmsOrDie();//如果所有消息发送成功 就正常执行, 如果有消息发送失败;就抛出异常;

        }
        catch (Exception ex)
        {
            //表示消息发送失败
            Console.WriteLine($"【{msg}】发送到RabbitMQ失败!");
        }
    }

    Thread.Sleep(1000);
}

运行结果:

2. TX事务模式

(1). 含义:基于AMPQ协议;可以让信道设置成一个带事务的信道,分为三步:开启事务、提交事务、事务回滚

(2). 特点:同步模式,在事务提交之前不能继续发送消息,该模式相比Confirm模式效率差一点。

(3). 核心代码:channel.TxSelect();   开启一个事务

         channel.TxCommit();  提交事务, 这一步成功后,消息才真正的写入队列

       channel.TxRollback();   事务回滚

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";
factory.DispatchConsumersAsync = true;
string exchangeName = "exchange1";
string eventName = "myEvent";
using var conn = factory.CreateConnection();
while (true)
{
    string msg = DateTime.Now.TimeOfDay.ToString();
    using (var channel = conn.CreateModel())
    {
        try
        {
            var properties = channel.CreateBasicProperties();
            properties.DeliveryMode = 2;
            channel.ExchangeDeclare(exchange: exchangeName, type: "direct", true, false);
            byte[] body = Encoding.UTF8.GetBytes(msg);
            channel.TxSelect(); //开启事务
            channel.BasicPublish(exchange: exchangeName, routingKey: eventName,
            mandatory: true, basicProperties: properties, body: body);
            Console.WriteLine($"【{msg}】成功发送到RabbitMQ!");
            channel.TxCommit(); //只有事务提交成功以后,才会真正的写入到队列里面去
        }
        catch (Exception ex)
        {
            //表示消息发送失败
            Console.WriteLine($"【{msg}】发送到RabbitMQ失败!");
            channel.TxRollback();
        }
    }

    Thread.Sleep(1000);
}

3、消费者(手动确认)

(1) 含义:消费者消费一条,回执给RabbitMq一条消息,Rabbitmq 只删除当前这一条消息;相当于是一条消费了,删除一条消息,性能稍微低一些;

(2) 特点:消费1条应答一次,可以告诉RabbitMq消费成功or失败,消费成功,服务器删除该条消息,消费失败,可以删除也可以重新写入。

(3) 核心代码:autoAck: false,表示不自动确认

 然后:channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);    表示消费成功

 channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);         表示消费失败, 可以配置:requeue: true:重新写入到队列里去; false: 删除消息

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";//服务器地址
factory.DispatchConsumersAsync = true;//支持异步接受
string exchangeName = "exchange1";//交换机的名称
string eventName = "myEvent";//路由键
using var conn = factory.CreateConnection();
using var channel = conn.CreateModel();//创建信道
string queueName = "queue1";//队列名称

//声明了交换机
channel.ExchangeDeclare(exchange: exchangeName, type: "direct",true,false);

//声明一个队列
//queuename: 队列的名称
//durable 是否持久化。true消息会持久化到本地,保证重启服务后消息不会丢失
//exclusive :表示独占方式,设置为true 在某些情景下有必要,例如:顺序消费。表示只有一个channel可以去监听,其他channel都不能够监听。目的就是为了保证顺序消费。
//autoDelete:队列如果与Exchange未绑定,则自动删除
//arguments:扩展参数
channel.QueueDeclare(queue: queueName, durable: true,
        exclusive: false, autoDelete: false, arguments: null);

//绑定队列
channel.QueueBind(queue: queueName,
    exchange: exchangeName, routingKey: eventName);


//创建一个消费者
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
//注册消费者订阅
//autoAck 是否自动确认消息, true自动确认,false 不自动要手动调用, 建立设置为false
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
Console.ReadLine();


async Task Consumer_Received(object sender, BasicDeliverEventArgs args)
{
    try
    {
        var bytes = args.Body.ToArray();
        string msg = Encoding.UTF8.GetString(bytes);
        Console.WriteLine(DateTime.Now + "收到了消息" + msg);
         
        //DeliveryTag: 唯一的编号
        //multiple:是否批量确认.true:将一次性ack所有小于deliveryTag的消息。
        channel.BasicAck(args.DeliveryTag, multiple: false);手动确认 
        await Task.Delay(800);
    }
    catch (Exception ex)
    {
        //异常重试
        //DeliveryTag: 唯一的编号
        //requeue:如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,而不会把它发送给新的消费者。
        channel.BasicReject(args.DeliveryTag, true);
        Console.WriteLine("处理收到的消息出错" + ex);
    }
}

运行结果

参考

【Windows安装RabbitMQ详细教程】_慕之寒的博客-CSDN博客_rabbitmq安装windows

第四节:RabbitMq剖析生产者、消费者的几种消息确认机制(Confirm、事务、自动、手动) - Yaopengfei - 博客园

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

C# 对RabbitMQ使用 的相关文章

  • Spring AMQP Java 客户端中的队列大小

    我使用 Spring amqp 1 1 版本作为我的 java 客户端 我有一个大约有 2000 条消息的队列 我想要一个服务来检查这个队列大小 如果它是空的 它会发出一条消息说 所有项目已处理 我不知道如何获取当前队列大小 请帮忙 我用谷
  • 如何在nodejs中验证rabbitmq?

    错误 握手被服务器终止 403 ACCESS REFUSED 消息 ACCESS REFUSED 使用身份验证拒绝登录 旋转机制平原 有关详细信息 请参阅代理日志文件 我单独尝试了 authMechanism PLAIN AMQPLAIN
  • rabbitmq 通道因 PRECONDITION_FAILED 关闭 - 快速回复消费者不存在

    当我们从 Spring Boot 服务向rabbitmq 发布消息时 出现以下错误 而且这是间歇性的 我们无法重现这一点 AMQP 连接 123 11 xxx xx 5672 错误 org springframework amqp rabb
  • Django 1.6 + RabbitMQ 3.2.3 + Celery 3.1.9 - 为什么我的 celery 工作人员会死掉:WorkerLostError:工作人员过早退出:信号 11 (SIGSEGV)

    这似乎解决了一个非常相似的问题 但并没有给我足够的洞察力 https github com celery billiard issues 101 https github com celery billiard issues 101听起来尝
  • RabbitMQ 启动失败

    RabbitMQ Windows 服务将无法启动 C Program Files x86 RabbitMQ Server rabbitmq server 3 0 4 sbin gt rabbitmq service bat start C
  • Celery 任务状态取决于 CELERY_TASK_RESULT_EXPIRES

    据我所知 任务状态完全取决于 CELERY TASK RESULT EXPIRES 设置的值 如果我在任务完成执行后检查此间隔内的任务状态 则返回的状态为 AsyncResult task id state 是正确的 如果没有 状态将不会更
  • 使用 Celery(RabbitMQ、Django)检索队列长度

    我在 django 项目中使用 Celery 我的代理是 RabbitMQ 我想检索队列的长度 我浏览了 Celery 的代码 但没有找到执行此操作的工具 我在 stackoverflow 上发现了这个问题 从客户端检查 RabbitMQ
  • 死信交换 RabbitMQ 丢弃消息

    我正在尝试在 RabbitMQ 中实现 dlx 队列 场景很简单 我有 2 个队列 1 活着 2 死亡 x dead letter exchange 立即 x message ttl 5000 以及 立即 交换 这必然是 1 活着 我尝试运
  • 定义具有多种消息类型的消息传递域

    到目前为止 我见过的大多数 F 消息传递示例都使用 2 4 种消息类型 并且能够利用模式匹配将每条消息定向到其正确的处理函数 对于我的应用程序 由于处理和所需参数的不同性质 我需要数百种独特的消息类型 到目前为止 每个消息类型都是其自己的记
  • Erl 无法连接到本地 EPMD。为什么?

    Erlang R14B04 erts 5 8 5 source 64 bit rq 1 async threads 0 kernel poll false Eshell V5 8 5 abort with G root ip 10 101
  • 如何重置rabbitmq管理用户

    使用rabbitmq 我们可以安装管理插件 然后我们通过浏览器访问http localhost 55672 使用访客 访客 问题是 我无法再登录 因为我更改了密码并为角色输入了空白 有没有办法重置rabbitmq管理的用户 您可以通过以下方
  • RabbitMQ - 如何死信/处理过期队列中的消息?

    我有一个队列x expires放 我遇到的问题是我需要对队列中的消息进行进一步处理IF队列过期 我最初的想法是设置x dead letter exchange在队列中 但是 当队列过期时 消息就会消失而不会进入死信交换 如何处理死信或以其他
  • RabbitMQ:如何创建和恢复备份

    我是 RabbitMQ 的新手 我需要一些帮助 如何备份和恢复到RabbitMQ 以及我需要保存哪些重要数据 谢谢 如果您安装了管理插件 您可以在Overview页 在底部你会看到导入 导出定义您可以使用它来下载代理的 JSON 表示形式
  • RabbitMQ:无法启动rabbitmq_management插件

    Version gt sudo rabbitmqctl status grep rabbit RabbitMQ rabbit RabbitMQ 3 5 6 Error gt sudo rabbitmq plugins enable rabb
  • 使用 Spring 与 RabbitMQ 集成

    我正在为我们的一个应用程序开发消息传递界面 该应用程序是一种服务 旨在接受 作业 进行一些处理并返回结果 实际上以文件的形式 这个想法是使用 RabbitMQ 作为消息传递基础设施 并使用 Spring AMQP 来处理协议特定的细节 我不
  • Celery 广播 vs RabbitMQ 扇出

    我最近一直在使用 Celery 但我不喜欢它 它的配置很混乱 过于复杂并且文档记录很少 我想用 Celery 从单个生产者向多个消费者发送广播消息 让我困惑的是 Celery 术语和底层传输 RabbitMQ 术语之间的差异 在 Rabbi
  • Spring RabbitMQ - 在具有 @RabbitListener 配置的服务上使用手动通道确认

    如何在不使用自动确认的情况下手动确认消息 有没有办法将其与 RabbitListener and EnableRabbit配置风格 大多数文档告诉我们使用SimpleMessageListenerContainer随着ChannelAwar
  • RabbitMq 和“致命错误:握手失败 -handshake_decode_error”

    我正在使用 Windows Server 2012 Erlang 19 2 和 RabbitMq 3 6 6 我在使用 TLS 配置端点之间的连接时遇到问题 我已经尝试了所有关于 SO 的答案 以及所有 RabbitMq 文档here ht
  • Celery 设计帮助:如何防止并发执行任务

    我对 Celery AMQP 相当陌生 正在尝试提出一个任务 队列 工作人员设计来满足以下要求 我有多种类型的 每用户 任务 例如 TaskA TaskB TaskC 这些 每用户 任务中的每一个都为系统中的一个特定用户读取 写入数据 因此
  • 如何让Spring RabbitMQ创建一个新的队列?

    根据我对rabbit mq的 有限 经验 如果您为尚不存在的队列创建新的侦听器 则会自动创建该队列 我正在尝试将 Spring AMQP 项目与rabbit mq 一起使用来设置侦听器 但出现错误 这是我的 xml 配置

随机推荐

  • C语言中scanf、gets、fgets,C++中cin、getline读取字符串的效率比较

    转自 xff1a https blog csdn net richenyunqi article details 89203826 可以使用C语言中scanf gets fgets xff0c C 43 43 中cin getline函数读
  • Android.mk和Android.bp的语句转换

    编译不同类型的模块 编译成 Native 动态库 Android mk include BUILD SHARED LIBRARY Android bp cc library shared 编译成 Native 静态库 Android mk
  • 学习c/c++ 推荐学习什么书籍?

    学习编程包含以下几个重要方面 xff1a 了解语言的语法 知道那些特性可以使用和何时使用 写出可读性好的代码 编译器可以理解 xff0c 但是下一个人是否可以阅读呢 xff1f 在一个更高层次设计结构良好的程序 为了学习一门语言 xff0c
  • 深入理解 http 反向代理(nginx)

    要理解什么是 反向代理 reverse proxy 自然你得先知道什么是 正向代理 forward proxy 另外需要说的是 一般提到反向代理 通常是指 http 反向代理 但反向代理的范围可以更大 比如 tcp 反向代理 在这里 不打算
  • 01-【Royal TSX】Mac上最好用的SSH工具Royal TSX使用教程

    参考文章 xff1a https www jianshu com p 0206980f529e
  • 数据结构:线性结构(C语言)

    文章目录 线性结构线性表什么是线性表线性表的抽象类型描述线性表的实现 广义表广义表定义多重链表 堆栈什么是堆栈堆栈的抽象数据类型描述堆栈的顺序存储实现堆栈的链式存储实现堆栈的应用 队列什么是队列队列的抽象数据类型描述队列的顺序存储实现队列的
  • 树(C语言)

    文章目录 树树的定义树的一些基本术语树的表示 树 树的定义 树 xff08 Tree xff09 xff1a n n 0
  • 堆(C语言)

    文章目录 堆 heap 什么是堆最小堆的操作操作集的实现 C语言 堆 heap 什么是堆 定义 堆 heap 是计算机科学中一类特殊的数据结构的统称 堆通常是一个可以被看做一棵树的数组对象 性质 结构性 xff1a 用数组表示的完全二叉树
  • VS2017中fopen等函数报错解决方法

    文章目录 VS2017中fopen 函数报错解决方法问题解决方法 VS2017中fopen 函数报错解决方法 问题 用VS2017写C语言代码的时候 xff0c 代码中使用了fopen 函数 xff0c 调试之后报错如下 xff1a err
  • 定位,浮动,BFC

    文章目录 1 xff0c 定位1 margin 与 padding 区别 xff1a 2 定位 xff1a 2 xff0c 元素分类 xff0c 浮动 xff0c BFC1 1 常见块级元素 xff1a 1 2 常见行内元素 xff1a 1
  • 表排序

    文章目录 表排序 表排序 思想 当待排数组中的元素不是简简单单的整数 xff0c 而是复杂的结构体 xff0c 那么移动元素所花费的时间就不能忽略不计了 xff0c 这时候我们要减少元素之间移动的次数了 表排序就是这么一个排序 xff0c
  • 循环链表的实现

    循环链表的实现 说明 参考资料 传智播客扫地僧的数据结构教学视频 线性表基本知识 参考 该实现的说明 C语言实现基于单向链表 xff0c 参考实现算法和数据的分离 实现 circular list h span class token ma
  • Ubuntu20.04安装与配置记录

    Ubuntu20 04安装与配置记录 原文地址 xff1a Ubuntu20 04安装与配置记录 一 Ubuntu系统盘制作 1 1 Windows环境下制作系统盘 下载Ubuntu系统 xff0c 选择桌面版 下载工具系统盘制作工具Ruf
  • C++单例写法记录

    C 43 43 单例写法记录 源码地址 一 饿汉式 1 1 静态指针 静态垃圾回收类 instance h ifndef INSTANCE H define INSTANCE H include lt string gt include l
  • 堆栈应用:表达式求值(C语言)

    文章目录 堆栈应用 xff1a 表达式求值 xff08 C语言 xff09 两个定义大致过程具体代码 堆栈应用 xff1a 表达式求值 xff08 C语言 xff09 两个定义 中缀表达式 xff1a 运算符号位于两个运算数之间 如 xff
  • andrsoid studio 长期编辑

    andrsoid studio 2020 02 08 修改build gradle配置 Top level build file where you can add configuration options common to all s
  • 使用 Maven 搭建 Mybatis 环境

    一 创建项目 1 点击 File gt New gt Module 2 选择左侧的 Maven xff0c 由于只是创建一个普通的项目 xff0c 此处点击 Next 即可 3 输入 GroupId 和 ArtifactId 4 配置 ma
  • ubuntu安装npm命令

    ubuntu安装npm命令 摘要 xff1a npm全称Node Package Manager xff0c 是node的包管理工具 xff0c 是用JavaScript写出来的工具 xff0c 被内置进了node中 xff0c 是随同No
  • zynq操作系统: Linux驱动开发AXIDMA篇

    前言 由于bram形式的速率限制 xff0c 在同样紧急的时间条件下 xff0c 还是改回了axidma的方式来降维打击 xff0c 对于几兆的速率 xff0c 颇有种杀鸡用牛刀的感觉 xff0c 没办法 xff0c 原来的刀就是差一点 x
  • C# 对RabbitMQ使用

    1 安装NuGet包RabbitMQ Client 2 生产者 确认机制 1 含义 xff1a 就是应答模式 xff0c 生产者发送一条消息之后 xff0c Rabbitmq服务器做了个响应 xff0c 表示收到了 2 特点 xff1a 异