RabbitMq基本概念 = >实践DirectExchange和TopicExchange交换机模式

2023-11-10

AMQP 和IM的区别:

AMQP: 高级消息队列

1、可以一对多广播,也可以一对一广播

2、生产者和消费者不知道对方是谁

IM:

1、只能一对一广播

2、生产者和消费者知道对方是谁

RabbitMQ:只是消息代理

我们不生产消息,我们只是消息的搬运工

每条消息只会发送给一个订阅者(一个萝卜一个坑)

消息的传递过程:

生产者==>MQ==>消费者

消息

有效载荷:需要传输的数据内容本身

标签:用来告诉Rabbit消息应该投递给谁

MQ的相关概念

交换器:用于接收生产者发送过来的消息    

队列:用来存储生产者发送过来的消息,供消费者消费,可以理解为装消息的容器

绑定:用来决策交换器将消息发送到哪个队列上



如果用乘坐火车打比方,可以这样理解RabbitMQ

乘客是消息,火车站进站安检口相当于交换器,负责接收乘客,并告诉乘客自己应该到哪个检票口等候

检票口排队的乘客相当于队列,乘坐同一班次列车的乘客在同一个检票口检票进站

乘客的手上的火车票相当于绑定(路由键),用来告诉乘客该坐哪一班次的列车,该从哪一个检票口进站。

火车车厢里的每一个座位就相当于消费者,一个座位只能给一个乘客坐

队列

什么是队列?

队列(Queues)是你的消息(messages)的终点,可以理解成装消息的容器。消息就一直在里面, 
直到有客户端(也就是消费者,Consumer)连接到这个队列并且将其取走为止

队列相关命令:

basic.consume:持续订阅消息,将信道设置为接受模式。消息一到达队列,消费者就接收消息,可以持续接收多条消息

basic.get:订阅单条消息,接收消息后,取消订阅。这样做是为了让消费者只接收下一条消息,如果想获得更多消息,需要再次发送basic.get。

basic.ack:消费者接收消息后,向RabbitMQ发送确认,表示已经收到消息

auto_ack:消费者在订阅队列是如果设置为auto_ack,表示消费者一旦接收消息后,RabbitMQ自动默认为消费者确认收到了消息。

basic.reject:消费者拒绝接收MQ发送过来的消息。RabbitMQ2.0版本后新增的命令

requeue:

true:如果requeue参数设置为true,则RabbitMQ将消息重新发送给下一位消费者

false:如果设置为false,则RabbitMQ将消息从队列移除

queue.declare:创建队列,如果不指定队列名称,则Rabbit会分配一个随机名称,并在命令的响应中返回

exclusive:如果设置为true,列队变为私有的,即一个队列只有一个消费者

aotu-delete:当最后一个消费者取消订阅后,队列会自动移除。如果需要临时创建一个队列,可以结合exclusive使用。消费者断开连接时,队列自动移除

队列工作原理

多个消费者订阅同一队列时,怎么发送消息?

消费者A和B同时订阅了seed_bin队列
1、消息Message_A到达队列seed_bin

2、RabbitMQ把Message_A发送给A

3、A确认接收到了消息Mesage_A

4、RabbitMQ将消息Message_A从队列seed_bin删除

5、消息Message_B到达队列seed_bin

6、RabbitMQ把Message_B发送给B

7、B确认接收到了消息Mesage_B

8、RabbitMQ将消息Message_B从队列seed_bin删除

注意:

如果消费者收到一条消息后,从RabbiMQ队列断开了(或者说取消了订阅),一直没有发送确认。那么MQ认为这条消息没有发送,
然后会重新将该消息发送给下一位消费者。这样做的好处是如果接收消息的程序出现问题,或者有bug,导致没有发送确认,
这样RabbitMQ不会再给该应用发送消息。因为RabbitMQ会认为你的程序还没有准备好接收下一条消息,这样可以防止消息
源源不断的涌向你的应用,到导致过载。

交换器

什么是交换器?
交换器是服务器接收到生产者发送过来的消息后,根据不同的规则,将消息投递到不同的队列的工具。这些规则被称为路由键。
队列是通过路由键绑定到交换器的。当生产者将消息发给RabbitMQ代理服务器时,消息将拥有一个路由键,
注: Rabbit会将其与绑定的路由键践行匹配,如果匹配则投递到该队列,如果匹配不到任何绑定模式,则消息进入黑洞同时rabbitMQ管理端Queues队列中对应的State是空闲状态如下图。

在这里插入图片描述

交换器的类型
direct:如果路由键匹配,则直接投递到对应的队列

fanout:不处理路由键,向所有与之绑定的队列投递消息

topic:处理路由键,按模式匹配,向符合规则的队列投递消息

headers:允许匹配消息的header,而非路由键,除此之外,direct完全一致,但性能差很多,基本不用了。
direct:
 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。
 如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,
 也不会转发dog.guard,只会转发dog。
fanout:
不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与
该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。
Fanout交换机转发消息是最快的。
topic:
将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。请参考下图

在这里插入图片描述

代码体现 ( 采用 DirectExchange 交换机模式 一对一)

注:DirectExchange 实践中此模式启动多个项目则会( 服务轮询策略 )起到均衡负载效果
生产者配置
/*
 * Copyright 2019 Wicrenet, Inc. All rights reserved.
 */
package com.xy.pay.main.config;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
/**
 * rabbitMq 配置
 *
 * @author YJX
 * Created on 2019-04-30 14:25
 */
@Configuration
@EnableRabbit
public class RabbitMQConfigurer {

    @Autowired
    private ConnectionFactory connectionFactory;


    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        //必须是prototype类型
        return new RabbitTemplate(this.connectionFactory);
    }

    /**
     * 【对账异常事件】交换机
     *
     * @author YJX
     * @date 2019-04-30 15:05
     **/
    @Bean
    @Qualifier("noticeReconciliation")
    public DirectExchange noticeReconciliationExchange() {
        return new DirectExchange("noticeReconciliation", true, false);
    }

}

生产者发送消息

/*
 * Copyright 2019 Wicrenet, Inc. All rights reserved.
 */
package com.xy.pay.main.amqp;

import com.alibaba.fastjson.JSON;
import com.xy.pay.main.PayMainLogs;
import com.xy.pay.main.amqp.model.ReconciliationPubData;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

/**
 * 利用RabbitMQ  对账异常通知
 *
 * @author YJX
 * Created on 2019-04-30 15:41
 */
@Component
@Primary
@ConditionalOnProperty(name = "pay.reconciliation-notice.channel", havingValue = "rabbit", matchIfMissing = true)
public class RabbitReconciliationNoticePub extends AbsReconciliationNoticePub implements RabbitTemplate.ConfirmCallback {

    private static final AtomicLong COUNTER = new AtomicLong(1L);
    private static final Set<Long>  SEEDS   = new HashSet<>();

    private final RabbitTemplate rabbitTemplate;

    @Autowired
    @Qualifier("noticeReconciliation")
    private       DirectExchange noticeReconciliation;


    @Autowired
    public RabbitReconciliationNoticePub(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        Long correlationId = Long.valueOf(correlationData.getId());
        if (ack) {
            //消息已经被队列接收了,rabbitMQ内部没有发生异常
            SEEDS.remove(correlationId);
            return;
        }

        if (SEEDS.contains(correlationId)) {
            // 如果丢失可以重发 2019/4/25 15:14
            PayMainLogs.MQ_LOG.error("【对账异常事件】rabbitMQ消息丢失:{}", StringUtils.abbreviate(cause, 200));
        } else {
            PayMainLogs.MQ_LOG.warn("【对账异常事件】:{}", StringUtils.abbreviate(cause, 200));
        }

    }

    /*
     * 【执行发布】
     *
     * @author YJX
     * @date 2019/04/30 22:01
     **/
    @Override
    void sendMsg(ReconciliationPubData notice) {
        final CorrelationData correlationId = new CorrelationData(getCorrelationId());
        //发送消息
        this.rabbitTemplate.convertAndSend(this.noticeReconciliation.getName(),
                "noticeReconciliation",
                JSON.toJSONString(notice), correlationId);
    }


    private synchronized String getCorrelationId() {
        long seed = COUNTER.getAndIncrement();
        SEEDS.add(seed);

        if (SEEDS.size() > 30000) {
            PayMainLogs.MQ_LOG.error("【对账异常事件】事件已经累计超过{}未处理了", SEEDS.size());

            //如果队列长于30000,要修剪了。移除最晚的10000条记录
            SEEDS.removeAll(SEEDS.stream()
                    .sorted(Long::compareTo)
                    .limit(10000)
                    .collect(Collectors.toList()));
        }
        return String.valueOf(seed);
    }
}

消费者配置

package com.xy.pay.risk.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

/**
 * rabbitMq 配置
 *
 * @author Canaan
 * @date 2019/4/24 10:46
 */
@Configuration
@EnableRabbit
public class RabbitMQConfigurer {

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        //必须是prototype类型
        return new RabbitTemplate(this.connectionFactory);
    }

    /**
     * 【交易事件】交换机
     *
     * @author Canaan
     * @date 2019/4/24 21:04
     */
    @Bean
    @Qualifier("tradeEventExchange")
    public DirectExchange tradeEventExchange() {
        return new DirectExchange("tradeEvent", true, false);
    }

    @Bean
    public Queue tradeEventQueue() {
        return new Queue("tradeEvent");
    }

    @Bean
    public Binding tradeEventQueueBinding() {
        return BindingBuilder.bind(tradeEventQueue()).to(tradeEventExchange()).with("tradeEvent");
    }

    /**
     * 【对账异常事件】交换机
     *
     * @author YJX
     * @date 2019-04-30 15:05
     **/
    @Bean
    @Qualifier("noticeReconciliationExchange")
    public DirectExchange noticeReconciliationExchange() {
        return new DirectExchange("noticeReconciliation", true, false);
    }

    @Bean
    public Queue noticeReconciliationQueue() {
        return new Queue("noticeReconciliation");
    }

    @Bean
    public Binding noticeReconciliationBinding() {
        return BindingBuilder.bind(noticeReconciliationQueue()).to(noticeReconciliationExchange()).with("noticeReconciliation");
    }

}

消费者消费队列里的消息

/*
 * Copyright 2019 Wicrenet, Inc. All rights reserved.
 */
package com.xy.pay.risk.amqp;

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.xy.common.verify.Asserts;
import com.xy.pay.dao.risk.BriefNoticeDao;
import com.xy.pay.dao.risk.BriefNoticeOwnerDao;
import com.xy.pay.dao.risk.ReconciliationNoticeDao;
import com.xy.pay.dao.risk.dict.NoticeStatusEnum;
import com.xy.pay.dao.risk.dict.NoticeTypeEnum;
import com.xy.pay.dao.risk.entity.BriefNotice;
import com.xy.pay.dao.risk.entity.BriefNoticeOwner;
import com.xy.pay.dao.risk.entity.ReconciliationNotice;
import com.xy.pay.risk.amqp.model.ReconciliationSubData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * 对账异常消息订阅
 *
 * @author YJX
 * Created on 2019/04/30 23:07
 */
@Component
@RabbitListener(queues = "#{noticeReconciliationQueue}")
public class RabbitReconciliationNoticeSub {
    private static final Logger logger = LoggerFactory.getLogger(RabbitReconciliationNoticeSub.class);

    @Autowired
    private BriefNoticeDao          briefNoticeDao;
    @Autowired
    private BriefNoticeOwnerDao     noticeOwnerDao;
    @Autowired
    private ReconciliationNoticeDao reconciliationNoticeDao;
    
    @RabbitHandler
    public void process(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {

        ReconciliationSubData subData;
        try {
            subData = JSON.parseObject(payload, ReconciliationSubData.class);
            this.persistent(subData);
        } catch (Exception e) {
            logger.error("对账异常消息持久化失败:{},{}", payload, e);
        }
    }

    private void persistent(ReconciliationSubData data) {
        Asserts.notNull(data);
        BriefNotice briefNotice = briefNoticeDao.selectByNoticeKey(data.getNoticeKey());
        //消息重复
        if (briefNotice != null) {
            return;
        }

        /*添加消息摘要*/
        briefNotice = new BriefNotice();
        briefNotice.setTitle(data.getTitle());
        briefNotice.setNoticeStatus(NoticeStatusEnum.NEW);
        briefNotice.setNoticeKey(data.getNoticeKey());
        briefNotice.setPower(data.getPower());
        briefNotice.setNoticeType(NoticeTypeEnum.RECONCILIATION);
        if (data.getDiffType() != null) {
            briefNotice.setSubType(data.getDiffType().toString());
        }
        int effectRow = this.briefNoticeDao.insert(briefNotice);

        if (effectRow == 0) {
            logger.error("对账异常消息持久化失败");
            return;
        }

        /*消息归属添加*/
        BriefNoticeOwner briefNoticeOwner = new BriefNoticeOwner();
        briefNoticeOwner.setNotice(briefNotice);
        briefNoticeOwner.setBrandId(data.getBrandId());
        briefNoticeOwner.setPointId(data.getStoreId());
        briefNoticeOwner.setRole(32);
        briefNoticeOwner.setPower(data.getPower());
        this.noticeOwnerDao.insert(briefNoticeOwner);

        /*添加对账详情消息*/
        ReconciliationNotice reconciliationNotice = new ReconciliationNotice();
        reconciliationNotice.setDiffType(data.getDiffType());
        reconciliationNotice.setLocalTradeInfo(data.getLocalTradeInfo());
        reconciliationNotice.setReconciliationTime(data.getReconciliationTime());
        reconciliationNotice.setStoreId(data.getStoreId());
        reconciliationNotice.setNotice(briefNotice);
        reconciliationNotice.setProvider(data.getProvider());
        reconciliationNotice.setRemoteTradeInfo(data.getRemoteTradeInfo());
        reconciliationNotice.setNoticeKey(data.getNoticeKey());
        this.reconciliationNoticeDao.insert(reconciliationNotice);
    }

}

TopicExchange 模式 一对多 ( 该模式为通知效果 )

生产者
package com.xy.pay.api.config;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

/**
 * rabbitMq 配置
 *
 * @author Canaan
 * @date 2019/4/24 10:46
 */
@Configuration
@EnableRabbit
public class RabbitMQConfigurer {

    @Autowired
    private ConnectionFactory connectionFactory;


    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        //必须是prototype类型
        return new RabbitTemplate(this.connectionFactory);
    }

    /**
     * 【交易事件】交换机
     *
     * @author Canaan
     * @date 2019/4/24 21:04
     */
    @Bean
    @Qualifier("tradeEventExchange")
    public DirectExchange tradeEventExchange() {
        return new DirectExchange("tradeEvent", true, false);
    }

    /**
     * 【交易通知】交换机,当用户调用接口时带有【notifyUrl】 推送地址时,进行异步推送
     *
     * @author Canaan
     * @date 2019/4/24 21:07
     */
    @Bean
    @Qualifier("tradeNotifyExchange")
    public TopicExchange tradeNotifyExchange() {
        return new TopicExchange("tradeNotify", true, false);
    }


}

生产者发消息

package com.xy.pay.api.amqp;

import com.alibaba.fastjson.JSON;
import com.xy.common.verify.Asserts;
import com.xy.pay.api.PayApiLogs;
import com.xy.pay.api.amqp.notify.TradeNotify;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

/**
 * rabbit 交易通知
 *
 * @author Canaan
 * @date 2019/4/26 11:30
 */
@Component
public class RabbitTradeNotifyPub implements TradeNotifyPubEngine, RabbitTemplate.ConfirmCallback {

    private static final Map<String, TradeNotify> POOL = new ConcurrentHashMap<>(1000);

    private final RabbitTemplate rabbitTemplate;

    @Autowired
    @Qualifier("tradeNotifyExchange")
    private TopicExchange tradeNotifyExchange;

    @Autowired
    @Qualifier("notifyExecutor")
    private Executor notifyExecutor;

    @Autowired
    private Environment environment;

    @Autowired
    public RabbitTradeNotifyPub(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 发布通知
     *
     * @param notify
     */
    @Override
    public void pub(final TradeNotify notify) {
        Asserts.notNull(notify);

        if (environment.acceptsProfiles("dev", "test")) {
            notify.verifyThrow();
        }

        Asserts.notNull(notify.getJointProvider());
        String routeKey = notify.getAct().routeKey.toString() + "." + notify.getJointProvider();

        POOL.putIfAbsent(notify.getNotifyId(), notify);
        //异步发送
        try {
            this.notifyExecutor.execute(() -> {
                this.clipPool(10000);        //通知池里的个数最多10000个
                this.rabbitTemplate.convertAndSend(
                        this.tradeNotifyExchange.getName(),
                        routeKey,
                        JSON.toJSONString(notify),
                        new CorrelationData(notify.getNotifyId()));
            });
        } catch (Exception e) {
            PayApiLogs.MQ_LOG.error("【交易通知发布】发送失败", StringUtils.abbreviate(e.getMessage(), 100));
        }

    }

    /**
     * 根据函数计算后返回要发布的内容
     *
     * @param supplier
     * @author Canaan
     * @date 2019/5/21 20:06
     */
    @Override
    public void computePub(Supplier<TradeNotify> supplier) {
        Asserts.notNull(supplier);

        try {
            this.notifyExecutor.execute(() -> {
                TradeNotify notify = supplier.get();
                if (notify == null) {
                    return;
                }
                if (environment.acceptsProfiles("dev", "test")) {
                    notify.verifyThrow();
                }

                Asserts.notNull(notify.getJointProvider());
                String routeKey = notify.getAct().routeKey.toString() + "." + notify.getJointProvider();

                POOL.putIfAbsent(notify.getNotifyId(), notify);

                this.clipPool(10000);        //通知池里的个数最多10000个
                this.rabbitTemplate.convertAndSend(
                        this.tradeNotifyExchange.getName(),
                        routeKey,
                        JSON.toJSONString(notify),
                        new CorrelationData(notify.getNotifyId()));
            });
        } catch (Exception e) {
            PayApiLogs.MQ_LOG.error("【交易通知发布】发送失败", StringUtils.abbreviate(e.getMessage(), 100));
        }

    }

    /**
     * 修剪池。防止过多
     *
     * @author Canaan
     * @date 2019/4/26 16:24
     */
    private void clipPool(int maxSize) {
        if (POOL.size() < maxSize) {
            return;
        }

        PayApiLogs.MQ_LOG.error("【交易通知发布】 POOL 数据超过{}个", POOL.size());

        //删除第1000个开始的数据,共1000个
        POOL.keySet().stream()
                .skip(1000)
                .limit(1000)
                .forEach(POOL::remove);
    }


    /**
     * Confirmation callback.
     *
     * @param correlationData correlation data for the callback.
     * @param ack             true for ack, false for nack
     * @param cause           An optional cause, for nack, when available, otherwise null.
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String noticeId = correlationData.getId();

        if (ack) {
            //消息已经被队列接收了,rabbitMQ内部没有发生异常
            POOL.remove(noticeId);
            return;
        }

        if (POOL.containsKey(noticeId)) {
            this.pub(POOL.get(noticeId));       //重新发送
            PayApiLogs.MQ_LOG.warn("【交易通知发布】消息重新发送:{}", StringUtils.abbreviate(cause, 200));
        } else {
            PayApiLogs.MQ_LOG.warn("【交易通知发布】可能丢失-{}", StringUtils.abbreviate(cause, 200));
        }

    }


}

消费者配置

/*
 * Copyright 2019 Wicrenet, Inc. All rights reserved.
 */
package com.xy.pay.transfer.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.core.env.Environment;

import java.util.Objects;

/**
 * rabbitMq 配置
 *
 * @author YJX
 * Created on 2019-04-30 14:25
 */
@Configuration
@EnableRabbit
public class RabbitMQConfigurer {

    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private Environment environment;

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        //必须是prototype类型
        return new RabbitTemplate(this.connectionFactory);
    }


    /**
     * 【交易通知】交换机,当用户调用接口时带有【notifyUrl】 推送地址时,进行异步推送
     *
     * @author Canaan
     * @date 2019/4/24 21:07
     */
    @Bean
    @Qualifier("tradeNotifyExchange")
    public TopicExchange tradeNotifyExchange() {
        return new TopicExchange("tradeNotify", true, false);
    }


    @Bean
    public Queue tradeNotifyQueue() {
        String profiles = environment.getActiveProfiles()[0];
        boolean isPro = Objects.equals("pro", profiles.toLowerCase());
        return new Queue("tradeNotify-" + profiles, isPro, false, !isPro);
    }


    @Bean
    public Binding tradeEventQueueBinding() {
        return BindingBuilder.bind(tradeNotifyQueue()).to(tradeNotifyExchange()).with("tradeNotify.#");
    }


    /**
     * 交易通知广播
     *
     * @author Canaan
     * @date 2019/5/6 16:16
     */
    @Bean
    @Qualifier("transferTradeExchange")
    public DirectExchange transferTradeExchange() {
        return new DirectExchange("transferTrade", true, false);
    }


}

消费者消费队列消息

package com.xy.pay.api.amqp;

import com.alibaba.fastjson.JSON;
import com.xy.common.verify.Asserts;
import com.xy.pay.api.PayApiLogs;
import com.xy.pay.api.amqp.notify.TradeNotify;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

/**
 * rabbit 交易通知
 *
 * @author Canaan
 * @date 2019/4/26 11:30
 */
@Component
public class RabbitTradeNotifyPub implements TradeNotifyPubEngine, RabbitTemplate.ConfirmCallback {

    private static final Map<String, TradeNotify> POOL = new ConcurrentHashMap<>(1000);

    private final RabbitTemplate rabbitTemplate;

    @Autowired
    @Qualifier("tradeNotifyExchange")
    private TopicExchange tradeNotifyExchange;

    @Autowired
    @Qualifier("notifyExecutor")
    private Executor notifyExecutor;

    @Autowired
    private Environment environment;

    @Autowired
    public RabbitTradeNotifyPub(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 发布通知
     *
     * @param notify
     */
    @Override
    public void pub(final TradeNotify notify) {
        Asserts.notNull(notify);

        if (environment.acceptsProfiles("dev", "test")) {
            notify.verifyThrow();
        }

        Asserts.notNull(notify.getJointProvider());
        String routeKey = notify.getAct().routeKey.toString() + "." + notify.getJointProvider();

        POOL.putIfAbsent(notify.getNotifyId(), notify);
        //异步发送
        try {
            this.notifyExecutor.execute(() -> {
                this.clipPool(10000);        //通知池里的个数最多10000个
                this.rabbitTemplate.convertAndSend(
                        this.tradeNotifyExchange.getName(),
                        routeKey,
                        JSON.toJSONString(notify),
                        new CorrelationData(notify.getNotifyId()));
            });
        } catch (Exception e) {
            PayApiLogs.MQ_LOG.error("【交易通知发布】发送失败", StringUtils.abbreviate(e.getMessage(), 100));
        }

    }

    /**
     * 根据函数计算后返回要发布的内容
     *
     * @param supplier
     * @author Canaan
     * @date 2019/5/21 20:06
     */
    @Override
    public void computePub(Supplier<TradeNotify> supplier) {
        Asserts.notNull(supplier);

        try {
            this.notifyExecutor.execute(() -> {
                TradeNotify notify = supplier.get();
                if (notify == null) {
                    return;
                }
                if (environment.acceptsProfiles("dev", "test")) {
                    notify.verifyThrow();
                }

                Asserts.notNull(notify.getJointProvider());
                String routeKey = notify.getAct().routeKey.toString() + "." + notify.getJointProvider();

                POOL.putIfAbsent(notify.getNotifyId(), notify);

                this.clipPool(10000);        //通知池里的个数最多10000个
                this.rabbitTemplate.convertAndSend(
                        this.tradeNotifyExchange.getName(),
                        routeKey,
                        JSON.toJSONString(notify),
                        new CorrelationData(notify.getNotifyId()));
            });
        } catch (Exception e) {
            PayApiLogs.MQ_LOG.error("【交易通知发布】发送失败", StringUtils.abbreviate(e.getMessage(), 100));
        }

    }

    /**
     * 修剪池。防止过多
     *
     * @author Canaan
     * @date 2019/4/26 16:24
     */
    private void clipPool(int maxSize) {
        if (POOL.size() < maxSize) {
            return;
        }

        PayApiLogs.MQ_LOG.error("【交易通知发布】 POOL 数据超过{}个", POOL.size());

        //删除第1000个开始的数据,共1000个
        POOL.keySet().stream()
                .skip(1000)
                .limit(1000)
                .forEach(POOL::remove);
    }


    /**
     * Confirmation callback.
     *
     * @param correlationData correlation data for the callback.
     * @param ack             true for ack, false for nack
     * @param cause           An optional cause, for nack, when available, otherwise null.
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String noticeId = correlationData.getId();

        if (ack) {
            //消息已经被队列接收了,rabbitMQ内部没有发生异常
            POOL.remove(noticeId);
            return;
        }

        if (POOL.containsKey(noticeId)) {
            this.pub(POOL.get(noticeId));       //重新发送
            PayApiLogs.MQ_LOG.warn("【交易通知发布】消息重新发送:{}", StringUtils.abbreviate(cause, 200));
        } else {
            PayApiLogs.MQ_LOG.warn("【交易通知发布】可能丢失-{}", StringUtils.abbreviate(cause, 200));
        }

    }


}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMq基本概念 = >实践DirectExchange和TopicExchange交换机模式 的相关文章

随机推荐

  • 深度学习实现缺陷检测

    深度学习实现缺陷检测 在工业生产过程中 缺陷检测是一个重要的环节 传统的缺陷检测方法通常依赖于人工提取特征和设计分类器 这种方式需要大量的人力和时间 并且对于复杂的缺陷类型可能不够有效 而深度学习技术通过利用神经网络自动学习特征和进行分类
  • Authing 正式发布应用集成网关 - Authing Gateway

    2023 年 2月 Authing 推出了身份领域的 PaaS化应用集成网关 Authing Gateway Authing Gateway 提供将原有应用快速集成到 Authing 身份云产品的能力 在扩充身份认证方式的同时 提高资源的安
  • 【OSGI】Error osgi xx Invalid value for DynamicImport-Package dynamic.import.pack

    1 背景 git下载项目 siddhi test suite 然后运行里面的测试类报错 Error osgi siddhi test suite Invalid value for DynamicImport Package dynamic
  • 计算机组成原理——存储器(一)

    存储器 一 一 存储器概述 二 存储器的分级结构 三 主存储器的技术指标 四 存储器与CPU的联系 地址总线 CPU与存储器的联系 编址方式 1 按字编址 M N 方式 2 按字节编址 五 SRAM存储器 cache 1 存储元基本结构 2
  • vue-element-admin 后台管理系统

    文章目录 前言 一 vue element admin 是什么 二 使用步骤 下载和部署 浏览模板项目代码 官网 启动 三 技术提炼 1 vue全家桶都有什么 2 vuex 什么时候用文件 什么时候直接写vue页面 导航守卫 3 组件之间的
  • 机器学习期末考试

    机器学习期末考试 一 机器学习链接 1 机器学习期末复习试卷 zhouyan2000的博客 CSDN博客 机器学习期末考试 2 机器学习笔试题 4条消息 机器学习笔试题目 北冥有小鱼 CSDN博客 机器学习题目 3 机器学习面试题 4 一天
  • 【Kubernetes理论篇】Kubernetes核心组件及资源介绍

    文章目录 一 Kubernetes架构 二 Kubernetes核心组件 三 Kubernetes核心资源 四 拓展 1 Service和Ingress的区别是什么 2 Replicaset和Deployment的区别是什么 3 Deplo
  • Flink 错误:找不到参数 evidence$ 的隐式值:TypeInformation 大数据

    Flink 是一个流处理和批处理框架 被广泛应用于大数据领域 在使用 Flink 进行开发时 有时会遇到各种各样的错误和异常 本文将详细介绍一种常见的错误情况 即在 Flink 中遇到的 No implicits found for par
  • Pycharm最新版如何设置自动换行

    1 代码编辑区自动换行 对所有文件有效 1 File gt Settings gt Editor gt General 2 找到Soft Wraps 勾选Soft wrap files 3 在输入框中添加 py 如下图所示 2 在图示位置点
  • 使用python和pyqt5轻松上手人脸识别系统(含代码)

    使用python和pyqt5轻松上手人脸识别系统 含代码 一 环境配置 1 1 python环境配置 1 1 1 安装 anaconda 1 1 2 安装pycharm 1 1 3 配置pip源 1 2 mysql数据库安装 1 3 相关依
  • Windows下,Eclipse的Android NDK(r8e) 配置

    一 关于NDK Android NDK全称 Native Development Kit 即本地开发包 1 NDK是一系列工具的集合 NDK提供了一系列的工具 这些工具对开发者的帮助是巨大的 它们能帮助开发者快速开发C 或C 的动态库 并能
  • windows怎么部署项目到云服务器

    要将项目部署到云服务器 可以按照以下步骤进行操作 1 在云服务提供商上创建一个云服务器实例 并确保已经将其配置和启动 2 在本地开发环境中将项目打包成可执行文件或者jar包 并确保项目能够正确运行 3 使用远程连接工具 如SSH RDP等
  • ctfshow萌新web17

    c传参过滤掉php 思路 include文件包含 利用日志文件包含 访问日志文件 c var log nginx access log 发现日志文件记录了user agent头 于是在该头中插入一句话木马 再访问日志文件 看日
  • AMBA总线协议AHB、APB、AXI对比分析

    一 AMBA概述 AMBA Advanced Microcontroller Bus Architecture 高级处理器总线架构 AHB Advanced High performance Bus 高级高性能总线 ASB Advanced
  • Uniapp中vueX实现登录状态功能

    uniapp使用Vuex实现登录状态的判断 退出登录 使用action commit实现登录功能 Vue use Vuex export default new Vuex Store state token userid username
  • 了解CMS(Concurrent Mark-Sweep)垃圾回收器

    原文地址为 了解CMS Concurrent Mark Sweep 垃圾回收器 一字不差的贴的人家的 就是感觉写的比较好 贴出来了 羞羞 1 总体介绍 CMS Concurrent Mark Sweep 是以牺牲吞吐量为代价来获得最短回收停
  • 使用git和maven过程一些命令

    git常用命令 1 git status 查看文件在工作目录与缓存的状态 2 git add 添加所有的文件到缓存 3 git commit m 提交的描述信息 如果我们这里不用 m参数的话 git将调到一个文本编译器 通常是vim 来让你
  • mysql5.7 leftjoin group by(获取关联表最新数据)

    用户表 users 日志表 logs 外键 user id 创建时间 created at 获取所有用户最新的日志 SELECT b from SELECT user id max created at as maxtime from lo
  • nafxcwd.lib(afxmem.obj) error lnk2005

    最近写程序突然遇到个错误 nafxcwd lib afxmem obj error lnk2005 查了下msdn发现主要原因是同时使用了CRT中的new delete和MFC中的new delete重载导致的 参考 http suppor
  • RabbitMq基本概念 = >实践DirectExchange和TopicExchange交换机模式

    AMQP 和IM的区别 AMQP 高级消息队列 1 可以一对多广播 也可以一对一广播 2 生产者和消费者不知道对方是谁 IM 1 只能一对一广播 2 生产者和消费者知道对方是谁 RabbitMQ 只是消息代理 我们不生产消息 我们只是消息的