考虑到每次轮询最大消息数和 Cron,轮询如何用于 FTP 入站通道适配器

2024-04-19

我有 UC,我需要从 ftp 位置选取文件并将其放入服务器位置 我正在使用 ftp-inbound-channel-adapter (Spring 集成 - 2.0.4)来实现它。 下面是我的xml中的配置

 <bean id="ftpAASessionFactory" class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
          <property name="host" value="${ftp.session.host}" />
          <property name="port" value="${ftp.session.port}" />
          <property name="username" value="${ftp.session.username}" />
          <property name="password" value="${ftp.session.password}" />
          <property name="clientMode" value="0" />
          <property name="fileType" value="0" />
   </bean>


   <ftp:inbound-channel-adapter id="ftpAAInbound"
          channel="ftpChannel" session-factory="ftpAASessionFactory" charset="UTF-8"
          auto-create-local-directory="false" delete-remote-files="true"
          remote-directory="${ftp.source.location}" local-directory="file://${ftp.target.location}"
          >
          <int:poller max-messages-per-poll="5" cron="0 */2 * ? * *">
          </int:poller>

   </ftp:inbound-channel-adapter>

   <int:channel id="ftpChannel">
          <int:queue />
          <int:interceptors>
                 <int:wire-tap channel="debugLogger" />
          </int:interceptors>
   </int:channel>

   <int:logging-channel-adapter id="debugLogger"
          level="DEBUG" log-full-message="true" />

   <int:logging-channel-adapter id="errorLogger"
          level="ERROR" log-full-message="true" />

我已将 max-messages-per-poll 配置为 5,并且每隔偶数分钟进行一次轮询(使用 cron 表达式)。

我的问题是,如果我们在 ftp 位置有 6 个文件,则所有 6 个文件都会在第一次轮询时传输到服务器位置(根据每次轮询的最大消息数= 5,它应该只从 Ftp 位置选择 5 个文件)并且有效负载仅由 5 个文件组成。

我希望在第一次轮询时只将 5 个文件传输到我的服务器,在第二次轮询时它应该选择最后一个

请提出解决方案 TIA

当 ftp 位置有 6 个文件时 PFB 日志

    [CRA] [01/03/2017 12:38:00] DEBUG [task-scheduler-8] DefaultFtpSessionFactory.createClient(158) | Connected to server [prgrear01.group.root.ad:21]
***[CRA] [01/03/2017 12:38:00] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_808_2015_02_01_07_50_01_102_20.txt***
[CRA] [01/03/2017 12:38:00] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_808_2015_02_01_07_50_01_102_20.txt
[CRA] [01/03/2017 12:38:00] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_808_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:00] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_808_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2015_02_01_07_50_01_102_01.txt
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2015_02_01_07_50_01_102_01.txt
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2015_02_01_07_50_01_102_21.txt
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2015_02_01_07_50_01_102_21.txt
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_810_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_810_2017_02_22_07_50_01_102_02.txt
***[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FileReadingMessageSource.scanInputDirectory(272) | Added to queue: [D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt, D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt, D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt, D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt, D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt, D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8]*** QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:40:00] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] DefaultFtpSessionFactory.createClient(158) | Connected to server [prgrear01.group.root.ad:21]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: null
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(101) | Received no Message during the poll, returning 'fal

.......


该日志仅显示 4 个文件;看起来它被配置为每次轮询 3 条消息,并且清楚地显示在 12:38 以及 1 和 12:40 发送了 3 个文件(没有找到第五个文件)。

Poll resulted in Message: null

EDIT

这是实现所需结果的另一种方法(使用出站网关)。该版本使用 Java DSL。

如果您更喜欢使用 XML 配置,则在中提供了类似的流程(以 XML 形式)ftp-样本 https://github.com/spring-projects/spring-integration-samples/tree/master/basic/ftp- 您必须在之间插入文件限制器ls网关和分路器。

@SpringBootApplication
public class So42528316Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So42528316Application.class, args);
        try {
            context.getBean(So42528316Application.class).runDemo();
        }
        finally {
            context.close();
        }
    }

    @Autowired
    private FileGateway fetchAndProcess;

    private void runDemo() throws Exception {
        Collection<Boolean> rmResults = this.fetchAndProcess.processFilesAt("so42528316");
        while (rmResults != null) {
            System.out.println("Processed " + rmResults.size() + " files");
            Thread.sleep(10_000);
            rmResults = this.fetchAndProcess.processFilesAt("so42528316");
        }
        System.out.println("No more files available");
    }

    @MessagingGateway(defaultRequestChannel = "flow.input", defaultReplyTimeout = "0")
    public interface FileGateway {

        Collection<Boolean> processFilesAt(String pattern);

    }

    @Bean
    public DefaultFtpSessionFactory sessionFactory() {
        DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
        factory.setHost("10.0.0.3");
        factory.setUsername("ftptest");
        factory.setPassword("ftptest");
        factory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
        return factory;
    }

    @Bean
    public IntegrationFlow flow() {
        return f -> f
                .handle(Ftp.outboundGateway(sessionFactory(), "ls", "payload"))
                .handle("so42528316Application", "limitFiles")
                .split()
                .handle(Ftp.outboundGateway(sessionFactory(), "get",
                                "payload.remoteDirectory + '/' + payload.filename")
                        .localDirectory(new File("/tmp", "so42528316")))
                .handle("so42528316Application", "process")
                .handle(Ftp.outboundGateway(sessionFactory(), "rm",
                        "headers['file_remoteDirectory'] + '/' + headers['file_remoteFile']"))
                .aggregate();
    }

    public List<FtpFileInfo> limitFiles(List<FtpFileInfo> files) {
        // Add any logic you want here, e.g. check if file already on local disk.
        if (files.size() == 0) {
            return null;
        }
        else if (files.size() > 2) {
            System.out.println("Reducing fetch list from " + files.size() + " to 2");
            return files.stream().limit(2).collect(Collectors.toList());
        }
        else {
            return files;
        }
    }

    public String process(File file) {
        System.out.println("Processing " + file);
        file.delete();
        return file.getName();
    }

}

Result:

Reducing fetch list from 3 to 2
Processing /tmp/so42528316/bar.txt
Processing /tmp/so42528316/baz.txt
Processed 2 files
Processing /tmp/so42528316/foo.txt
Processed 1 files
No more files available
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

考虑到每次轮询最大消息数和 Cron,轮询如何用于 FTP 入站通道适配器 的相关文章

  • 如何完善这个FTP(shell)功能?

    我有大量使用以下函数的脚本 Copies files over using FTP Configurations set at the beggining of the script param 1 FTP Host 2 FTP User
  • C# 单文件FTP下载

    我试图在 C 控制台应用程序中使用 FTP 下载文件 但即使我知道路径是正确的 我总是收到错误消息 550 file not found 有什么方法可以返回当前路径 一旦连接到服务器 lade datei von FTP server st
  • 如何通过 Python 的请求使用 FTP

    是否可以使用requests与 FTP 站点交互的模块 requests获取 HTTP 页面非常方便 但当我尝试使用 FTP 站点时 我似乎遇到架构错误 我有什么遗漏的吗requests允许我执行 FTP 请求 还是不支持 对于像我一样得到
  • Spring Integration 中的 @Router 与注释(请求/回复)

    您能提供在 Spring Integration 中路由消息的任何示例吗 按有效负载消息 标头或类似以下内容进行过滤
  • 使用 PowerShell 在 IIS FTP 站点上设置权限和设置

    我是 PowerShell 的初学者 我尝试了这个脚本并且运行良好 但我需要将 FTP 授权规则更改为 所有用户 读 写 并将 FTP 用户隔离 更改为 用户名目录 NEEDED FOR IIS CMDLETS Import Module
  • 当没有文件可供下载时,如何避免 SSIS FTP 任务失败?

    我正在使用 SQL Server 2005 并在 SSIS 中创建 ftp 任务 有时会有文件需要通过 ftp 传输 有时则不会 如果没有文件 我不希望任务或包失败 我已将从 ftp 任务到下一个任务的箭头更改为 完成 以便包运行 我已将允
  • PHP + FTP删除文件夹中的文件

    我刚刚编写了一个 PHP 脚本 它应该连接到 FTP 并删除特殊文件夹中的所有文件 它看起来像这样 但我不知道需要什么命令来删除文件夹日志中的所有文件 任何想法
  • 触发“对等方重置连接”

    我想测试当发生 对等方重置连接 错误时我们的应用程序 嵌入式 ftp 服务器 中发生的日志记录 这个帖子 https stackoverflow com questions 1434451 connection reset by peer很
  • 如何让spring为JdbcMetadataStore创建相应的schema?

    我想使用此处描述的 jdbc 元数据存储 https docs spring io spring integration docs 5 2 0 BUILD SNAPSHOT reference html jdbc html jdbc met
  • 集群中的spring集成+cron+quartz?

    我有一个由 cron 表达式触发的 spring 集成流程 如下所示
  • GnuTLS 错误 -110:TLS 连接未正确终止

    我发现我的一个 Windows 服务没有连接到 Unix 服务器上的 FTP 位置 我在我的 PC 上运行了可执行文件 因为开发人员没有记录任何错误 并且我在尝试从 FTPWebRequest 获取响应时遇到超时错误C 中的对象 尝试使用
  • 使用 Spring 与 RabbitMQ 集成

    我正在为我们的一个应用程序开发消息传递界面 该应用程序是一种服务 旨在接受 作业 进行一些处理并返回结果 实际上以文件的形式 这个想法是使用 RabbitMQ 作为消息传递基础设施 并使用 Spring AMQP 来处理协议特定的细节 我不
  • 使用ftp协议连接密码包含“@”符号的服务器

    我正在尝试使用java中的FTP协议从服务器下载文件 通过使用以下 URL 我可以连接到服务器并下载文件 URL url new URL ftp user password host remoteFile type i 但是当我的密码包含
  • SSH 主机密钥指纹与模式 C# WinSCP 不匹配

    我尝试通过 WinSCP 使用 C 连接到 FTPS 服务器 但收到此错误 SSH 主机密钥指纹 与模式不匹配 经过大量研究 我相信这与密钥的长度有关 当使用 服务器和协议信息 下的界面进行连接时 我从 WinSCP 获得的密钥是xx xx
  • 在python中浏览ftp目录

    我正在尝试使用 ftplib 使用 Python 3 从 ftp 服务器下载多个文件夹 我有一个文件夹名称列表 它们都位于文件夹 root 中 问题是我不知道如何浏览它们 当我使用cwd我可以进入更深的目录 但是如何再次起来呢 我正在尝试得
  • Python FTP下载550错误

    我编写了一个 ftp 爬虫来下载特定文件 它会一直工作 直到找到要下载的特定文件 然后抛出此错误 ftplib error perm 550 该文件存在于我的下载文件夹中 但文件大小为 0 kb 我需要转换某些内容才能下载吗 我可以访问 f
  • 如何注册 org.springframework.integration.monitor.IntegrationMBeanExporter

    根据http www ibm com support knowledgecenter en SS7K4U 8 5 5 com ibm websphere nd multiplatform doc ae cspr data access tr
  • 如何使用 spring 集成 dsl 解组 xml

    我正在研究 spring 集成 dsl 要求是从队列中读取 xml 消息 根据消息头值 我需要调用不同的服务 我能够从队列中获取消息 但无法在 dsl 中编写代码来将 xml 消息解组到对象 有人可以帮忙吗 我有我的解组器 但无法用 dsl
  • Spring Integration - IBM MQ - 使用队列中的大消息

    我们使用 spring 集成连接到 IBM MQ V7 5 以从队列中读取消息 我们偶尔会收到大消息需要阅读 SI jms 适配器无法读取大消息 但它适用于较小的消息 下面是我们得到的异常 23 18 35 470 WARN Default
  • 尝试使用 FTP_TLS 从同一目录下载第二个文件时出现“ftplib.error_perm:550 不允许操作”

    运行我的脚本时出现此错误 ftplib error perm 550 Operation not permitted ftpdownloader py import ftplib import threading import loggin

随机推荐