Spring 集成 Java DSL - 动态创建 IntegrationFlows

2024-04-13

我正在使用 Spring Boot 1.5.13.RELEASE 和 Spring Integration 4.3.16.RELEASE 开发一个应用程序。

我对 Spring Integration 还很陌生,并且遇到了一个问题。

所以基本的想法是,在一些外部触发器(可能是 HTTP 调用)上,我需要创建一个 IntegrationFlow ,它将使用来自rabbitMQ队列的消息,对它们进行一些处理,然后(可能)生成到另一个rabbitMQ端点。

现在这应该会发生很多次,所以我必须创建multiple集成流程。

我在用集成流程上下文像这样注册每个 IntegrationFlow:

IntegrationFlowContext flowContext;
...
IntegrationFlow integrationFlow = myFlowFactory.makeFlow(uuid);
...
flowContext.registration(integrationFlow).id(callUUID).register();

我必须澄清这可以同时发生,创建multiple集成同时进行。

因此,每次我尝试创建 IntegrationFlow 时,我的“源”都是一个网关,如下所示:

MessagingGatewaySupport sourceGateway = Amqp
        .inboundGateway(rabbitTemplate.getConnectionFactory(), rabbitTemplate, dynamicQueuePrefix+uuid)
        .concurrentConsumers(1)
        .adviceChain(retryInterceptor)
        .autoStartup(false)
        .id("sgX-" + uuid)
        .get();

它还不是 @Bean,但我希望它在注册每个 IntegrationFlow 时注册。

我的“目标”是 Amqp OutBoundAdapter,如下所示:

@Bean
public AmqpOutboundEndpoint outboundAdapter(
        RabbitTemplate rabbitTemplate,
        ApplicationMessagingProperties applicationMessagingProperties
) {
    return Amqp.outboundAdapter(rabbitTemplate)
            .exchangeName("someStandardExchange")
            .routingKeyExpression("headers.get('rabbitmq.ROUTING_KEY')")
            .get();
}

现在这个IS已经有一个 bean,每次我尝试创建流时都会注入它。

我的流程如下所示:

public IntegrationFlow configure() {
    return IntegrationFlows
            .from(sourceGateway)
            .transform(Transformers.fromJson(HashMap.class, jsonObjectMapper))
            .filter(injectedGenericSelectorFilter)
            .<HashMap<String, String>>handle((payload, headers) -> {

                String uuid = payload.get("uuid");

                boolean shouldForwardMessage = myInjectedApplicationService.isForForwarding(payload);
                myInjectedApplicationService.handlePayload(payload);

                return MessageBuilder
                        .withPayload(payload)
                        .setHeader("shouldForward", shouldForwardMessage)
                        .setHeader("rabbitmq.ROUTING_KEY", uuid)
                        .build();
            })
            .filter("headers.get('shouldForward').equals(true)")
            .transform(Transformers.toJson(jsonObjectMapper))
            .handle(outboundAdapter)
            .get();
}

我的问题是,虽然应用程序启动正常并创建第一个 IntegrationFlows 等。后来,我遇到了这种异常:

java.lang.IllegalStateException:无法在bean名称“org.springframework.integration.transformer.MessageTransformingHandler#872”下注册对象[org.springframework.integration.transformer.MessageTransformingHandler#872]:已经存在对象[org.springframework.integration .transformer.MessageTransformingHandler#872] 绑定

我什至尝试为每个使用的组件设置一个 id,它应该用作 beanName ,如下所示:

.transform(Transformers.fromJson(HashMap.class, jsonObjectMapper), tf -> tf.id("tf1-"+uuid))

但是,即使像 .filter 这样的组件的 bean 名称问题已经解决,我仍然得到关于 MessageTransformingHandler 的相同异常。


UPDATE

我没有提到这样一个事实:每一次IntegrationFlow工作完成后,使用以下命令将其删除IntegrationFlowContext像这样:

flowContext.remove(flowId);

所以似乎(有点)有效的是同步两个流量登记块和去流通过使用相同的对象作为锁来阻止。

所以我负责注册和删除流的类如下所示:

...
private final Object lockA = new Object();
...

public void appendNewFlow(String callUUID){
    IntegrationFlow integrationFlow = myFlowFactory.makeFlow(callUUID);

    synchronized (lockA) {
        flowContext.registration(integrationFlow).id(callUUID).register();
    }
}

public void removeFlow(String flowId){

    synchronized (lockA) {
        flowContext.remove(flowId); 
    }

}
...

我现在的问题是这种锁对于应用程序来说有点重,因为我得到了很多:

...Waiting for workers to finish.
...
...Successfully waited for workers to finish.

这并没有我想要的那么快发生。

但我想这是预期的,因为每次线程获取锁时,都需要一些时间register流程及其所有组成部分或注销流程及其所有组成部分。


你也有这个:

.transform(Transformers.toJson(jsonObjectMapper))

如果添加一个,它会如何工作.id()那里也有?

另一方面,既然你说这是同时发生的,我想知道你是否可以编写一些代码synchonized,例如包裹那个flowContext.registration(integrationFlow).id(callUUID).register();.

bean 定义和注册过程实际上不是线程安全的,并且只能在应用程序生命周期开始时的初始化线程中使用。

我们可能真的需要做一个IntegrationFlowContext作为线程安全的register(IntegrationFlowRegistrationBuilder builder)函数,或者至少是它的registerBean(Object bean, String beanName, String parentName)因为这正是我们生成 bean 名称并注册它的地方。

欢迎就此事提出 JIRA。

不幸的是,Spring Integration Java DSL 扩展项目已经不再支持,我们只能对当前的项目添加修复5.x一代。尽管如此我相信synchonized解决方法应该在这里起作用,因此不需要将其向后移植到 Spring Integration Java DSL 扩展中。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring 集成 Java DSL - 动态创建 IntegrationFlows 的相关文章

随机推荐