Springboot RabbitMq源码解析之消费者容器SimpleMessageListenerContainer

2023-11-07

Springboot RabbitMq源码解析之配置类
Springboot RabbitMq源码解析之消息发送

一、MessageListenerContainer

在Springboot RabbitMq中,接口MessageListenerContainer负责接收并处理消息。

public interface MessageListenerContainer extends SmartLifecycle {

	/**
	 * Setup the message listener to use. Throws an {@link IllegalArgumentException}
	 * if that message listener type is not supported.
	 * @param messageListener the {@code object} to wrapped to the {@code MessageListener}.
	 */
	void setupMessageListener(Object messageListener);

	/**
	 * @return the {@link MessageConverter} that can be used to
	 * convert {@link org.springframework.amqp.core.Message}, if any.
	 */
	MessageConverter getMessageConverter();

}

从前面的配置类RabbitAnnotationDrivenConfiguration中也能看到,springboot主要支持simple和direct两种方式进行消息的消费,在这里,以SimpleMessageListenerContainer为例查看消息是如何被接收处理的。

二、SimpleMessageListenerContainer启动

1. AbstractMessageListenerContainer#start

首先,MessageListenerContainer继承了SmartLifecycle,因此我们从start方法开始作为介入点进入源码进行查看。

/**
 * Start this container.
 * @see #doStart
 */
@Override
public void start() {
	if (isRunning()) {
		return;
	}
	if (!this.initialized) {
		synchronized (this.lifecycleMonitor) {
			if (!this.initialized) {
				afterPropertiesSet();
				this.initialized = true;
			}
		}
	}
	try {
		if (logger.isDebugEnabled()) {
			logger.debug("Starting Rabbit listener container.");
		}
		configureAdminIfNeeded();
		checkMismatchedQueues();
		doStart();
	}
	catch (Exception ex) {
		throw convertRabbitAccessException(ex);
	}
}

SimpleMessageListenerContainer本身并没有重写start方法,其start方法从父类AbstractMessageListenerContainer继承而来,在AbstractMessageContainer#start方法中,主要进行了包括afterPropertiesSet, configureAdminIfNeeded, checkMismatchedQueues方法在内的一些初始化工作,剩下了工作委托给doStart方法进行处理,方便扩展。

2. SimpleMessageListenerContainer#doStart

SimpleMessageListenerContainer#start方法的目的是校验通过后将消费者consumer提交到线程池taskExecutors,主要逻辑分为3步:
(1)messageListener是ListenerContainerAware时的场景的处理,检查queueNames是否和messsgeListener的expectedQueueNames一致。

if (getMessageListener() instanceof ListenerContainerAware) {
	Collection<String> expectedQueueNames = ((ListenerContainerAware) getMessageListener()).expectedQueueNames();
	if (expectedQueueNames != null) {
		String[] queueNames = getQueueNames();
		Assert.state(expectedQueueNames.size() == queueNames.length,
				"Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
						+ Arrays.asList(queueNames));
		boolean found = false;
		for (String queueName : queueNames) {
			if (expectedQueueNames.contains(queueName)) {
				found = true;
			}
			else {
				found = false;
				break;
			}
		}
		Assert.state(found, () -> "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
				+ Arrays.asList(queueNames));
	}
}

(2)通过super.doStart();初始化active和running属性,并唤醒所有相关线程。

/**
 * Start this container, and notify all invoker tasks.
 * @throws Exception if thrown by Rabbit API methods
 */
protected void doStart() throws Exception {
	// Reschedule paused tasks, if any.
	synchronized (this.lifecycleMonitor) {
		this.active = true;
		this.running = true;
		this.lifecycleMonitor.notifyAll();
	}
}

(3)初始化consumer并提交

synchronized (this.consumersMonitor) {
	if (this.consumers != null) {
		throw new IllegalStateException("A stopped container should not have consumers");
	}
	int newConsumers = initializeConsumers();
	if (this.consumers == null) {
		logger.info("Consumers were initialized and then cleared " +
				"(presumably the container was stopped concurrently)");
		return;
	}
	if (newConsumers <= 0) {
		if (logger.isInfoEnabled()) {
			logger.info("Consumers are already running");
		}
		return;
	}
	Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
	for (BlockingQueueConsumer consumer : this.consumers) {
		AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
		processors.add(processor);
		getTaskExecutor().execute(processor);
		if (getApplicationEventPublisher() != null) {
			getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
		}
	}
	for (AsyncMessageProcessingConsumer processor : processors) {
		FatalListenerStartupException startupException = processor.getStartupException();
		if (startupException != null) {
			throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
		}
	}
}

(3.1)初始化consumer在这儿初始化的consumer为BlockingQueueConsumer类型,consumer数量根据concurrentConsumers而来,默认为1。每个consumer的属性根据SimpleMessageListenerContainer而来。

protected int initializeConsumers() {
	int count = 0;
	synchronized (this.consumersMonitor) {
		if (this.consumers == null) {
			this.cancellationLock.reset();
			this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);
			for (int i = 0; i < this.concurrentConsumers; i++) {
				BlockingQueueConsumer consumer = createBlockingQueueConsumer();
				this.consumers.add(consumer);
				count++;
			}
		}
	}
	return count;
}

protected BlockingQueueConsumer createBlockingQueueConsumer() {
	BlockingQueueConsumer consumer;
	String[] queues = getQueueNames();
	// There's no point prefetching less than the tx size, otherwise the consumer will stall because the broker
	// didn't get an ack for delivered messages
	int actualPrefetchCount = getPrefetchCount() > this.txSize ? getPrefetchCount() : this.txSize;
	consumer = new BlockingQueueConsumer(getConnectionFactory(), getMessagePropertiesConverter(),
			this.cancellationLock, getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount,
			isDefaultRequeueRejected(), getConsumerArguments(), isNoLocal(), isExclusive(), queues);
	if (this.declarationRetries != null) {
		consumer.setDeclarationRetries(this.declarationRetries);
	}
	if (getFa
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Springboot RabbitMq源码解析之消费者容器SimpleMessageListenerContainer 的相关文章