我想使用 SourcePollingChannelAdapter 来实现轮询时需要事务传播,以便在发生错误时回滚所有操作。 setTransactionSynchronizationFactory 方法没有注释...
非常感谢你的帮助 !
在 XML 中我可以这样做:
<int:poller fixed-rate="5000">
<int:transactional transaction-manager="transactionManager" propagation="REQUIRED" />
</int:poller>
我想通过 SourcePollingChannelAdapter 和 periodicTrigger 以编程方式使用这样的事务,但我不知道如何操作。
我有这个 :
SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
adapter.setSource(source);
adapter.setTrigger(new PeriodicTrigger(5, TimeUnit.SECONDS));
adapter.setOutputChannel(channel);
adapter.setBeanFactory(ctx);
adapter.start();
当bean源被调用时,数据库中的一个元素被删除,一条消息被创建并在outputchannel中发送;但如果我在 ouputchannel 之后的流程中出现错误,我希望数据库恢复并且元素返回......实际上是一个带有传播的简单事务。我不明白怎么做。
输出通道是:
<int:channel id="channel" >
<int:queue />
</int:channel>
<int-http:outbound-gateway request-channel="channel"
url="http://localhost:8081/icopitole-ws/baseactive" http-method="GET"
reply-channel="reresponseVersionChannel" expected-response-type="java.lang.String" />
当 URL 没有响应时,会抛出异常,但不会执行 Rollback,尽管我已经像您所说的那样添加了 DefaultTransactionSynchronizationFactory 和 TransactionInterceptor :(
如果我理解正确的话,你需要使用这个:默认事务同步工厂 https://github.com/spring-projects/spring-integration/blob/master/spring-integration-core/src/main/java/org/springframework/integration/transaction/DefaultTransactionSynchronizationFactory.java
这是如何配置它的快照:
SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor =
new ExpressionEvaluatingTransactionSynchronizationProcessor();
syncProcessor.setBeanFactory(mock(BeanFactory.class));
PollableChannel queueChannel = new QueueChannel();
syncProcessor.setBeforeCommitExpression(new SpelExpressionParser().parseExpression("#bix"));
syncProcessor.setBeforeCommitChannel(queueChannel);
syncProcessor.setAfterCommitChannel(queueChannel);
syncProcessor.setAfterCommitExpression(new SpelExpressionParser().parseExpression("#baz"));
DefaultTransactionSynchronizationFactory syncFactory =
new DefaultTransactionSynchronizationFactory(syncProcessor);
adapter.setTransactionSynchronizationFactory(syncFactory);
交易边界涵盖在SourcePollingChannelAdapter#adviceChain
,所以应该这样配置:
TransactionInterceptor txAdvice =
new TransactionInterceptor(transactionManager,
new MatchAlwaysTransactionAttributeSource(new DefaultTransactionAttribute()));
adapter.setAdviceChain(Collections.singletonList(txAdvice));
因此,现在每个“民意调查”都将包含交易和您的syncFactory
会做的事情。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)