我的情况与描述的类似这个问题。区别在于我不使用WebFlux.outboundGateway but an Ftp.outboundGateway我称之为AbstractRemoteFileOutboundGateway.Command.GET命令,常见的问题是我无法获得定义的RequestHandlerRetryAdvice要使用的。


@RequestMapping( value = "/somepath" )
public class DownloadController
   private DownloadGateway downloadGateway;

   public DownloadController( DownloadGateway downloadGateway )
      this.downloadGateway = downloadGateway;

   @PostMapping( "/downloads" )
   public void download( @RequestParam( "filename" ) String filename )
      Map<String, Object> headers = new HashMap<>();

      downloadGateway.triggerDownload( filename, headers );
public interface DownloadGateway
   @Gateway( requestChannel = "downloadFiles.input" )
   void triggerDownload( Object value, Map<String, Object> headers );
public class FtpDefinition
   private FtpProperties ftpProperties;

   public FtpDefinition( FtpProperties ftpProperties )
      this.ftpProperties = ftpProperties;

   public DirectChannel gatewayDownloadsOutputChannel()
      return new DirectChannel();

   public IntegrationFlow downloadFiles( RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile )
      return f -> f.handle( getRemoteFile, getRetryAdvice() )
                   .channel( "gatewayDownloadsOutputChannel" );

   private Consumer<GenericEndpointSpec<AbstractRemoteFileOutboundGateway<FTPFile>>> getRetryAdvice()
      return e -> e.advice( ( (Supplier<RequestHandlerRetryAdvice>) () -> {
         RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
         advice.setRetryTemplate( getRetryTemplate() );
         return advice;
      } ).get() );

   private RetryTemplate getRetryTemplate()
      RetryTemplate result = new RetryTemplate();

      FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
      backOffPolicy.setBackOffPeriod( 5000 );

      result.setBackOffPolicy( backOffPolicy );
      return result;

   public RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile( SessionFactory sessionFactory )
         Ftp.outboundGateway( sessionFactory,
                              "payload" )
            .fileExistsMode( FileExistsMode.REPLACE )
            .localDirectoryExpression( "'" + ftpProperties.getLocalDir() + "'" )
            .autoCreateLocalDirectory( true );

   public SessionFactory<FTPFile> ftpSessionFactory()
      DefaultFtpSessionFactory sessionFactory = new DefaultFtpSessionFactory();
      sessionFactory.setHost( ftpProperties.getServers().get( 0 ).getHost() );
      sessionFactory.setPort( ftpProperties.getServers().get( 0 ).getPort() );
      sessionFactory.setUsername( ftpProperties.getServers().get( 0 ).getUser() );
      sessionFactory.setPassword( ftpProperties.getServers().get( 0 ).getPassword() );
      return sessionFactory;
public class FtpTestApplication {

    public static void main(String[] args) {
        SpringApplication.run( FtpTestApplication.class, args );
@PropertySource( "classpath:ftp.properties" )
@ConfigurationProperties( prefix = "ftp" )
public class FtpProperties
   private String localDir;

   private List<Server> servers;

   public static class Server
      private String host;

      private int port;

      private String user;

      private String password;


根据加里·拉塞尔的评论,我希望重试失败的下载。但是,如果我中断下载服务器端(通过在 FileZilla 实例中发出“Kick user”),我只会立即获得堆栈跟踪,而无需重试:

org.apache.commons.net.ftp.FTPConnectionClosedException: FTP response 421 received.  Server closed connection.

我还需要上传文件,为此我使用Ftp.outboundAdapter。在这种情况下并且具有相同的RetryTemplate,如果我中断上传服务器端,Spring Integration 会再执行两次尝试,每次延迟 5 秒,然后才记录java.net.SocketException: Connection reset,一切如预期。

我尝试进行一些调试,并注意到在第一次尝试通过Ftp.outboundAdapter,断点RequestHandlerRetryAdvice.doInvoke()被击中。但通过下载时Ftp.outboundGateway,该断点是never hit.


抱歉耽搁了;本周我们在 SpringOne 平台。

问题是由于网关规范是一个 bean - 网关最终在应用建议之前被初始化。


public IntegrationFlow downloadFiles(SessionFactory<FTPFile> sessionFactory) {
    return f -> f.handle(getRemoteFile(sessionFactory), getRetryAdvice())


private RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile(SessionFactory<FTPFile> sessionFactory) {
    return Ftp.outboundGateway(sessionFactory,



public IntegrationFlow downloadFiles(SessionFactory<FTPFile> sessionFactory) {
    return f -> f.handle(Ftp.outboundGateway(sessionFactory,
            .autoCreateLocalDirectory(true), getRetryAdvice())

