Spring Batch 和 Spring Integration 的集成问题 - “没有为端点定义轮询器”异常

2024-02-23

我经历了Spring 集成指南 http://docs.spring.io/spring-integration/reference/html/sftp.html例子在这里 https://github.com/spring-projects/spring-integration-samples/tree/master/basic/sftp并获得了 Spring Integration SFTP 程序的工作示例。我已经有一个正在运行的 Spring Batch 程序,它读取一堆文件并将其转储到数据库中。

我现在尝试通过以下方式集成 Spring Batch 和 Spring Integration 程序春季文档 http://docs.spring.io/spring-batch/reference/html/springBatchIntegration.html我创建了以下配置。

<bean id="sftpSessionFactory" class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
        <property name="host" value="${host}"/>
        <property name="port" value="${port}"/>
        <property name="user" value="${user}"/>
        <property name="password" value="${password}"/>
    </bean>

    <int:channel id="inboundFileChannel"><int:queue/></int:channel>
    <int:channel id="outboundJobRequestChannel"/>
    <int:channel id="jobLaunchReplyChannel"/>

    <int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
            channel="inboundFileChannel"
            session-factory="sftpSessionFactory"
            local-directory="file:/chofac/data/mex/registry/outbox"
            remote-directory="/chofac/SFTP/MEX/outbox"
            auto-create-local-directory="true"
            delete-remote-files="false"
            filename-pattern="*.txt">
        <int:poller max-messages-per-poll="-1" fixed-rate="1000" />
    </int-sftp:inbound-channel-adapter>

    <int:transformer input-channel="inboundFileChannel"
        output-channel="outboundJobRequestChannel">
        <bean class="com.chofac.mint.integration.FileMessageToJobRequest">
            <property name="job" ref="responseFileReaderJob"/>
            <!-- <property name="fileParameterName" value="input.file.name"/> -->
        </bean>
    </int:transformer>

    <batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
        reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
        <int:poller fixed-rate="1000"/>
    </batch-int:job-launching-gateway>

    <int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

    <batch:job id="responseFileReaderJob">
        <batch:step id="dailyReaderJob">
            <batch:tasklet>
                <batch:chunk reader="dailyRRReader" writer="dailyRRDBWriter" processor="itemProcessor" commit-interval="10"/>
            </batch:tasklet>
        </batch:step>
    </batch:job>

我正在使用下面的测试用例运行该程序:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:META-INF/spring/applicationContext.xml","classpath:META-INF/spring/inbound-ResponseReaderJobIntegration.xml"})
public class AAASftpInboundMsgJobTriggerTest {

    @Resource(name="inboundFileChannel")
    PollableChannel localFileChannel;

    @Test
    public void runDemo(){
        System.out.println("Received first file message: " + localFileChannel.receive());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

我收到此错误:

2014-07-01 10:51:48,987 [主要] 信息 org.hibernate.impl.SessionFactoryImpl - 关闭 2014-07-01 10:51:48,988 [主要] 调试 org.springframework.beans.factory.support.DisposableBeanAdapter - 对名称为“dataSource”的 bean 调用销毁方法“close” 2014-07-01 10:51:48,988 [主要] 错误 org.springframework.test.context.TestContextManager - 捕获异常 同时允许 TestExecutionListener [org.springframework.test.context.support.DependencyInjectionTestExecutionListener@2faadcc6] 准备测试实例 [com.chofac.mint.integration.AAASftpInboundMsgJobTriggerTest@5d3ad33d] java.lang.IllegalStateException:无法加载ApplicationContext org.springframework.test.context.CacheAwareContextLoaderDelegate.loadContext(CacheAwareContextLoaderDelegate.java:99) 在 org.springframework.test.context.DefaultTestContext.getApplicationContext(DefaultTestContext.java:101) 在 org.springframework.test.context.support.DependencyInjectionTestExecutionListener.injectDependency(DependencyInjectionTestExecutionListener.java:109) 在 org.springframework.test.context.support.DependencyInjectionTestExecutionListener.prepareTestInstance(DependencyInjectionTestExecutionListener.java:75) 在 org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:326) 在 org.springframework.test.context.junit4.SpringJUnit4ClassRunner.createTest(SpringJUnit4ClassRunner.java:212) 在 org.springframework.test.context.junit4.SpringJUnit4ClassRunner$1.runReflectiveCall(SpringJUnit4ClassRunner.java:289) 在 org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) 在 org.springframework.test.context.junit4.SpringJUnit4ClassRunner.methodBlock(SpringJUnit4ClassRunner.java:291) 在 org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:232) 在 org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:89) 在 org.junit.runners.ParentRunner$3.run(ParentRunner.java:231) 在 org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60) 在 org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229) 在 org.junit.runners.ParentRunner.access$000(ParentRunner.java:50) 在 org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222) 在 org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) 在 org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:71) 在 org.junit.runners.ParentRunner.run(ParentRunner.java:300) 处 org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:175) 在 org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50) 在 org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) 在 org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467) 在 org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683) 在 org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390) 在 org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197) 引起的:org.springframework.beans.factory.BeanCreationException: 使用名称创建 bean 时出错 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0': 调用init方法失败;嵌套异常是 java.lang.IllegalArgumentException:尚未定义轮询器 终点 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0', 并且上下文中没有可用的默认轮询器。在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1553) 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:539) 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:475) 在 org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:304) 在 org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:228) 在 org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:300) 在 org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:195) 在 org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:681) 在 org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:760) 在 org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:482) 在 org.springframework.test.context.support.AbstractGenericContextLoader.loadContext(AbstractGenericContextLoader.java:121) 在 org.springframework.test.context.support.AbstractGenericContextLoader.loadContext(AbstractGenericContextLoader.java:60) 在 org.springframework.test.context.support.AbstractDelegatingSmartContextLoader.delegateLoading(AbstractDelegatingSmartContextLoader.java:100) 在 org.springframework.test.context.support.AbstractDelegatingSmartContextLoader.loadContext(AbstractDelegatingSmartContextLoader.java:250) 在 org.springframework.test.context.CacheAwareContextLoaderDelegate.loadContextInternal(CacheAwareContextLoaderDelegate.java:64) 在 org.springframework.test.context.CacheAwareContextLoaderDelegate.loadContext(CacheAwareContextLoaderDelegate.java:91) ... 25 更多 引起者:java.lang.IllegalArgumentException:没有轮询器 已为端点定义 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0', 并且上下文中没有可用的默认轮询器。在 org.springframework.util.Assert.notNull(Assert.java:112) 在 org.springframework.integration.config.ConsumerEndpointFactoryBean.initializeEndpoint(ConsumerEndpointFactoryBean.java:238) 在 org.springframework.integration.config.ConsumerEndpointFactoryBean.afterPropertiesSet(ConsumerEndpointFactoryBean.java:187) 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1612) 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1549) ... 40 更多 2014-07-01 10:51:48,992 [main] 调试 org.springframework.test.context.support.DirtiesContextTestExecutionListener - 测试类之后:上下文 [DefaultTestContext@ac44b88 testClass = SftpInboundMsgJobTriggerTest, testInstance = [null], testMethod = [null],testException = [null],mergedContextConfiguration = [MergedContextConfiguration@4102799c testClass = SftpInboundMsgJobTriggerTest,位置= '{类路径:META-INF/spring/applicationContext.xml, 类路径:META-INF/spring/inbound-ResponseReaderJobIntegration.xml.xml}', 类 = '{}',contextInitializerClasses = '[]',activeProfiles = '{}', 上下文加载器 = 'org.springframework.test.context.support.DelegatingSmartContextLoader', 父 = [null]]],dirtiesContext [false]。

我尝试了大部分此问题的热门答案 http://goo.gl/rYfymI当我输入这个问题时,来自 Google 的建议和来自 Stack Overflow 的建议,所有这些都导致了不同的其他错误,而且我似乎偏离了主要问题。

最常见的建议是添加全局轮询器,但这会导致以下错误:

<int:poller default="true" fixed-delay="50"/>

引起原因:java.lang.IllegalArgumentException:轮询器不应该 为端点指定 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1', 因为“outboundJobRequestChannel”是一个 SubscribableChannel(不是 可轮询)。

(我是所有这些方面的新手,Spring、Spring Batch 和 Spring Integration)任何帮助将不胜感激。提前致谢。

Update 1

我删除了 #2 中的轮询器,如下所示

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
</batch-int:job-launching-gateway>

并删除了全局轮询器

<int:poller default="true" fixed-delay="50"/> 

我得到以下异常:

线程“main”中的异常org.springframework.beans.factory.BeanCreationException:创建名称为“org.springframework.integration.config.ConsumerEndpointFactoryBean#0”的bean时出错:调用init方法失败;嵌套异常是 java.lang.IllegalArgumentException:没有为端点“org.springframework.integration.config.ConsumerEndpointFactoryBean#0”定义轮询器,并且上下文中没有可用的默认轮询器。 在org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1553) 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:539) 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:475) 在 org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:304) 在 org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:228) 在 org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:300) 在 org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:195) 在org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:681) 在org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:760) 在org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:482) 在org.springframework.context.support.ClassPathXmlApplicationContext。(ClassPathXmlApplicationContext.java:139) 在org.springframework.context.support.ClassPathXmlApplicationContext。(ClassPathXmlApplicationContext.java:93) 在 com.chofac.mint.integration.DownloadFileRunBatch.main(DownloadFileRunBatch.java:15) 引起原因:java.lang.IllegalArgumentException:没有为端点“org.springframework.integration.config.ConsumerEndpointFactoryBean#0”定义轮询器,并且上下文中没有可用的默认轮询器。 在 org.springframework.util.Assert.notNull(Assert.java:112) 在org.springframework.integration.config.ConsumerEndpointFactoryBean.initializeEndpoint(ConsumerEndpointFactoryBean.java:238) 在 org.springframework.integration.config.ConsumerEndpointFactoryBean.afterPropertiesSet(ConsumerEndpointFactoryBean.java:187) 在org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1612) 在org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1549) ... 12 更多

然而...如果我离开全局轮询器,SFTP 传输就会发生并触发作业

<int:poller default="true" fixed-delay="50"/>

Update 2

如果删除,我会得到以下异常

Exception in thread "main" org.springframework.beans.factory.BeanNotOfRequiredTypeException: Bean named 'inboundFileChannel' must be of type [org.springframework.messaging.PollableChannel], but was actually of type [org.springframework.integration.channel.DirectChannel]  at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:376)    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)  at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:979)  at com.chofac.mint.batchintegration.SftpInboundMsgJobTriggerMain.main(SftpInboundMsgJobTriggerMain.java:16)

这是我的配置:

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
        channel="inboundFileChannel"
        session-factory="sftpSessionFactory"
        local-directory="file:${inbound.local.directory}"
        remote-directory="${inbound.remote.directory}"
        auto-create-local-directory="true"
        delete-remote-files="false"
        filename-pattern="*.*">
    <int:poller max-messages-per-poll="-1" fixed-rate="45000" />
    <!-- <int:poller max-messages-per-poll="-1" cron="${MEX.CRON.PATTERN}"/> -->
    <!-- 0 15 10 ? * MON-FRI -->
</int-sftp:inbound-channel-adapter>

<int:channel id="inboundFileChannel"></int:channel>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>

<!-- <int:poller default="true" cron="${MEX.CRON.PATTERN}"/> -->

<int:transformer input-channel="inboundFileChannel"
    output-channel="outboundJobRequestChannel">
    <bean class="com.chofac.mint.integration.FileMessageToJobRequest">
        <property name="job" ref="responseFileReaderJob"/>
    </bean>
</int:transformer>

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
</batch-int:job-launching-gateway>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

我从主程序触发此操作,如下所示:

ApplicationContext context = new ClassPathXmlApplicationContext("classpath:META-INF/spring/applicationContext.xml","classpath:META-INF/spring/batchintegration/inboundSFTPJob.xml");
PollableChannel pollableFileChannel = context.getBean("inboundFileChannel", PollableChannel.class);
System.out.println("Received first file message: " + pollableFileChannel.receive());

Update 3

Sample 配置 https://github.com/spring-projects/spring-integration-samples/blob/master/basic/sftp/src/test/resources/META-INF/spring/integration/SftpInboundReceiveSample-context.xml, JUnit https://github.com/spring-projects/spring-integration-samples/blob/master/basic/sftp/src/test/java/org/springframework/integration/samples/sftp/SftpInboundReceiveSample.java, and 弹簧示例 http://docs.spring.io/spring-batch/trunk/reference/html/springBatchIntegration.html在网上。

Update 4

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
        channel="inboundFileChannel"
        session-factory="sftpSessionFactory"
        local-directory="file:${inbound.local.directory}"
        remote-directory="${inbound.remote.directory}"
        auto-create-local-directory="true"
        delete-remote-files="false" 
        filename-pattern ="*.*"
        local-filter="acceptAllFileListFilter">
    <int:poller max-messages-per-poll="-1" fixed-rate="45000" />

<bean id="acceptAllFileListFilter" class="org.springframework.integration.file.filters.AcceptAllFileListFilter"/>

程序运行到一个连续的循环,但有异常。尽管我为轮询器指定了 45 秒,但我感觉轮询发生得非常频繁(每秒)完整配置和日志在这里 https://docs.google.com/document/d/1Nyo0nc6S46iNNCiLH5vgIj6H4wncYVtz0uU_MZX6qDs/edit?usp=sharing.


outboundJobRequestChannel is a SubscribableChannel所以你不能有<poller/>在batch-int:作业启动网关上。inboundFileChannel is a QueueChannel所以它的消费者需要一个轮询器(变压器)。

Notice #0, #1在豆子名称中。这int-sftp:inbound-channel-adapter (#0)正确地有一个轮询器;只需删除另一个即可。

对于这种情况,您实际上并不需要全局(默认)轮询器(但它正在用于您的变压器)。

EDIT:

(回复对您问题的评论)。

这是默认行为。默认情况下,local-filter is an AcceptOnceFileListFilter。如果您在下次轮询之前删除该文件,则可以将其更改为AcceptAllFileListFilter.

如果您想将文件保留在磁盘上,但检测到它已被更改,请使用FileSystemPersistentAcceptOnceFileListFilter;由于您没有删除远程文件,因此您还应该设置filter to an SftpPersistentAcceptOnceFileListFilter(实际上是一个CompositeFileListFilter包装此过滤器和模式过滤器之一)。

否则,您将继续获取相同的文件,并且不会preseve-timestamp,本地过滤器每次都会认为这是一个新文件。

持久过滤器使用元数据存储并使用文件名和lastModified时间戳来确定过滤器是否应该接受(传递)文件。

EDIT2:

该示例只是将文件转储到队列通道中并在main;您已订阅变压器。

就是这段代码...

PollableChannel pollableFileChannel = context.getBean("inboundFileChannel", PollableChannel.class);
System.out.println("Received first file message: " + pollableFileChannel.receive());

这给你带来了当前的悲伤 - 只需从你的 main 中删除这些行即可。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring Batch 和 Spring Integration 的集成问题 - “没有为端点定义轮询器”异常 的相关文章

随机推荐