Springboot RabbitMq源码解析之配置类
Springboot RabbitMq源码解析之消息发送
一、MessageListenerContainer
在Springboot RabbitMq中,接口MessageListenerContainer负责接收并处理消息。
public interface MessageListenerContainer extends SmartLifecycle {
/**
* Setup the message listener to use. Throws an {@link IllegalArgumentException}
* if that message listener type is not supported.
* @param messageListener the {@code object} to wrapped to the {@code MessageListener}.
*/
void setupMessageListener(Object messageListener);
/**
* @return the {@link MessageConverter} that can be used to
* convert {@link org.springframework.amqp.core.Message}, if any.
*/
MessageConverter getMessageConverter();
}
从前面的配置类RabbitAnnotationDrivenConfiguration中也能看到,springboot主要支持simple和direct两种方式进行消息的消费,在这里,以SimpleMessageListenerContainer为例查看消息是如何被接收处理的。
二、SimpleMessageListenerContainer启动
1. AbstractMessageListenerContainer#start
首先,MessageListenerContainer继承了SmartLifecycle,因此我们从start方法开始作为介入点进入源码进行查看。
/**
* Start this container.
* @see #doStart
*/
@Override
public void start() {
if (isRunning()) {
return;
}
if (!this.initialized) {
synchronized (this.lifecycleMonitor) {
if (!this.initialized) {
afterPropertiesSet();
this.initialized = true;
}
}
}
try {
if (logger.isDebugEnabled()) {
logger.debug("Starting Rabbit listener container.");
}
configureAdminIfNeeded();
checkMismatchedQueues();
doStart();
}
catch (Exception ex) {
throw convertRabbitAccessException(ex);
}
}
SimpleMessageListenerContainer本身并没有重写start方法,其start方法从父类AbstractMessageListenerContainer继承而来,在AbstractMessageContainer#start方法中,主要进行了包括afterPropertiesSet, configureAdminIfNeeded, checkMismatchedQueues方法在内的一些初始化工作,剩下了工作委托给doStart方法进行处理,方便扩展。
2. SimpleMessageListenerContainer#doStart
SimpleMessageListenerContainer#start方法的目的是校验通过后将消费者consumer提交到线程池taskExecutors,主要逻辑分为3步:
(1)messageListener是ListenerContainerAware时的场景的处理,检查queueNames是否和messsgeListener的expectedQueueNames一致。
if (getMessageListener() instanceof ListenerContainerAware) {
Collection<String> expectedQueueNames = ((ListenerContainerAware) getMessageListener()).expectedQueueNames();
if (expectedQueueNames != null) {
String[] queueNames = getQueueNames();
Assert.state(expectedQueueNames.size() == queueNames.length,
"Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
+ Arrays.asList(queueNames));
boolean found = false;
for (String queueName : queueNames) {
if (expectedQueueNames.contains(queueName)) {
found = true;
}
else {
found = false;
break;
}
}
Assert.state(found, () -> "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
+ Arrays.asList(queueNames));
}
}
(2)通过super.doStart();
初始化active和running属性,并唤醒所有相关线程。
/**
* Start this container, and notify all invoker tasks.
* @throws Exception if thrown by Rabbit API methods
*/
protected void doStart() throws Exception {
// Reschedule paused tasks, if any.
synchronized (this.lifecycleMonitor) {
this.active = true;
this.running = true;
this.lifecycleMonitor.notifyAll();
}
}
(3)初始化consumer并提交
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
throw new IllegalStateException("A stopped container should not have consumers");
}
int newConsumers = initializeConsumers();
if (this.consumers == null) {
logger.info("Consumers were initialized and then cleared " +
"(presumably the container was stopped concurrently)");
return;
}
if (newConsumers <= 0) {
if (logger.isInfoEnabled()) {
logger.info("Consumers are already running");
}
return;
}
Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
for (BlockingQueueConsumer consumer : this.consumers) {
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
getTaskExecutor().execute(processor);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
}
for (AsyncMessageProcessingConsumer processor : processors) {
FatalListenerStartupException startupException = processor.getStartupException();
if (startupException != null) {
throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
}
}
}
(3.1)初始化consumer在这儿初始化的consumer为BlockingQueueConsumer类型,consumer数量根据concurrentConsumers而来,默认为1。每个consumer的属性根据SimpleMessageListenerContainer而来。
protected int initializeConsumers() {
int count = 0;
synchronized (this.consumersMonitor) {
if (this.consumers == null) {
this.cancellationLock.reset();
this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);
for (int i = 0; i < this.concurrentConsumers; i++) {
BlockingQueueConsumer consumer = createBlockingQueueConsumer();
this.consumers.add(consumer);
count++;
}
}
}
return count;
}
protected BlockingQueueConsumer createBlockingQueueConsumer() {
BlockingQueueConsumer consumer;
String[] queues = getQueueNames();
// There's no point prefetching less than the tx size, otherwise the consumer will stall because the broker
// didn't get an ack for delivered messages
int actualPrefetchCount = getPrefetchCount() > this.txSize ? getPrefetchCount() : this.txSize;
consumer = new BlockingQueueConsumer(getConnectionFactory(), getMessagePropertiesConverter(),
this.cancellationLock, getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount,
isDefaultRequeueRejected(), getConsumerArguments(), isNoLocal(), isExclusive(), queues);
if (this.declarationRetries != null) {
consumer.setDeclarationRetries(this.declarationRetries);
}
if (getFa