我正在使用 Spring 和 Websphere MQ 进行以下消息传递配置。
我需要实现一个场景的重试逻辑,其中我从队列接收消息并将消息数据放到弹性搜索服务器(搜索服务器是非事务性的),如果搜索服务器关闭,我必须回滚消息再次放入队列并在一段时间间隔(例如:30秒)后处理消息。此重试必须进行 5 次。 5 次后,消息必须放入死信队列。我们使用 Tomcat 作为服务器。
我们使用 spring Integration jms:message-driven-channel-adapter 来接收消息
如何使用 spring 和 Websphere MQ 实现此行为?
我爬遍了许多站点,可以找到对 Active MQ 的支持,但找不到对 IBM MQ 的支持。
<bean id="mqConnectionFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory">
<property name="hostName">
<value>${queue_hostname}</value>
</property>
<property name="port">
<value>${queue_port}</value>
</property>
<property name="queueManager">
<value>${queue_manager}</value>
</property>
<property name="transportType">
<value>1</value>
</property>
</bean>
<!-- JMS Queue Connection Factory -->
<bean id="jmsQueueConnectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory102">
<property name="targetConnectionFactory">
<ref bean="mqConnectionFactory" />
</property>
<property name="pubSubDomain">
<value>false</value>
</property>
</bean>
<!-- JMS Destination Resolver -->
<bean id="jmsDestinationResolver"
class="org.springframework.jms.support.destination.DynamicDestinationResolver">
</bean>
<!-- JMS Queue Template -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate102">
<property name="connectionFactory">
<ref bean="jmsQueueConnectionFactory" />
</property>
<property name="destinationResolver">
<ref bean="jmsDestinationResolver" />
</property>
<property name="pubSubDomain">
<value>false</value>
</property>
<property name="receiveTimeout">
<value>20000</value>
</property>
</bean>
JMS 规范中没有任何关于延迟重新传递的内容。一些经纪商有自定义机制/策略来实施它;您必须查看经纪人文档。
正如之前所说,您可以使用传递计数标头在重试一定次数后放弃。
EDIT
回应您在下面的评论...
仅当您使用支持 JMS 2.0 的 MQ 版本时 - 看起来您使用的是非常旧的版本,需要JmsTemplate102
。 1.0.2 是古老的; 1.1 已经推出多年了; Spring支持JMS 2.0已经近3年了。如果您有 JMS 2.0 代理(和客户端库),请设置JmsTemplate bean 上的传递延迟 http://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/core/JmsTemplate.html#setDeliveryDelay-long-。然后,通过以下方式配置出站通道适配器以使用该模板jms-template
财产。
为了了解重新传递计数,请将整个消息传递到您的服务中,或者使用 POJO 方法将其配置为获取该标头...
public MyReply process(@Payload MyObject foo,
@Header("JMSXRedeliveryCount") int redeliveryCOunt) {
...
}
同样,这是 JMS 2.0 功能(标头),尽管一些代理在 1.1 中提供了它。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)