我对 Spring Integration 配置进行了集成测试,该测试使用来自具有持久订阅的 JMS 主题的消息。为了进行测试,我使用 ActiveMQ 而不是 Tibco EMS。
我遇到的问题是,我必须在测试方法开始时使用睡眠调用来延迟将第一条消息发送到端点。否则该消息将被丢弃。
如果我删除持久订阅和选择器的设置,那么第一条消息可以立即发送,不会有延迟。
我想摆脱不可靠的睡眠。有没有办法在发送消息之前检查端点是否已完全设置?
下面是配置。
感谢您的帮助!
<int-jms:message-driven-channel-adapter
id="myConsumer" connection-factory="myCachedConnectionFactory"
destination="myTopic" channel="myChannel" error-channel="errorChannel"
pub-sub-domain="true" subscription-durable="true"
durable-subscription-name="testDurable"
selector="..."
transaction-manager="emsTransactionManager" auto-startup="false"/>
如果您使用干净的嵌入式 activemq 进行测试,则在建立订阅之前,订阅的持久性是无关紧要的。所以你别无选择,只能等到那一天发生。
您可以通过发送一系列启动消息来避免睡眠,并且仅在收到最后一条消息时才开始真正的测试。
EDIT
我忘了还有一个方法isRegisteredWithDestination()
on the DefaultMessageListenerContainer
.
Java文档...
/**
* Return whether at least one consumer has entered a fixed registration with the
* target destination. This is particularly interesting for the pub-sub case where
* it might be important to have an actual consumer registered that is guaranteed
* not to miss any messages that are just about to be published.
* <p>This method may be polled after a {@link #start()} call, until asynchronous
* registration of consumers has happened which is when the method will start returning
* {@code true} – provided that the listener container ever actually establishes
* a fixed registration. It will then keep returning {@code true} until shutdown,
* since the container will hold on to at least one consumer registration thereafter.
* <p>Note that a listener container is not bound to having a fixed registration in
* the first place. It may also keep recreating consumers for every invoker execution.
* This particularly depends on the {@link #setCacheLevel cache level} setting:
* only {@link #CACHE_CONSUMER} will lead to a fixed registration.
*/
我们用它来一些通道测试,我们使用反射获取容器,然后轮询该方法,直到我们订阅该主题。
/**
* Blocks until the listener container has subscribed; if the container does not support
* this test, or the caching mode is incompatible, true is returned. Otherwise blocks
* until timeout milliseconds have passed, or the consumer has registered.
* @see DefaultMessageListenerContainer#isRegisteredWithDestination()
* @param timeout Timeout in milliseconds.
* @return True if a subscriber has connected or the container/attributes does not support
* the test. False if a valid container does not have a registered consumer within
* timeout milliseconds.
*/
private static boolean waitUntilRegisteredWithDestination(SubscribableJmsChannel channel, long timeout) {
AbstractMessageListenerContainer container =
(AbstractMessageListenerContainer) new DirectFieldAccessor(channel).getPropertyValue("container");
if (container instanceof DefaultMessageListenerContainer) {
DefaultMessageListenerContainer listenerContainer =
(DefaultMessageListenerContainer) container;
if (listenerContainer.getCacheLevel() != DefaultMessageListenerContainer.CACHE_CONSUMER) {
return true;
}
while (timeout > 0) {
if (listenerContainer.isRegisteredWithDestination()) {
return true;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) { }
timeout -= 100;
}
return false;
}
return true;
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)