Spring Batch 集成 Java DSL 和 RunIdIncrementer 不递增

2024-04-01

我有一个 spring boot/integration/batch,它将在 SFTP 上运行和轮询文件。

我希望能够最终使用相同的参数(基本上相同的文件)重新启动作业(可能是因为应用程序已重新启动,或者因为某些原因我们再次收到相同的文件)RunIdIncrementer在作业配置中定义。

不幸的是 run.id=1 不会增加,我得到一个JobInstanceAlreadyCompleteException

作业配置

@Autowired
private JobBuilderFactory jobBuilders;

@Bean
public Job importOffersJob() {

    Job job = jobBuilders.get("importOffersJob")
            .start(importOffersStep)
            .listener(traceJobExecutionListener())  
            .incrementer(new RunIdIncrementer())
            .build();

    return job;
}

一体化

@Bean
public IntegrationFlow ftpInboundFlow() {
    return IntegrationFlows
            .from(Sftp.inboundAdapter(SftpSessionFactory())
                .regexFilter(".*\\.xml.mini$")
                .deleteRemoteFiles(intCfg.getSftpDeleteRemoteFiles())
                .preserveTimestamp(Boolean.TRUE)
                .autoCreateLocalDirectory(Boolean.TRUE)
                .remoteDirectory(intCfg.getSftpRemoteDirectory())
                .localDirectory(new File(intCfg.getSftpLocalDirectory())
            ), 
                e -> e.id("sftpInboundAdapter")
                .poller(Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES).maxMessagesPerPoll(1)))
            .transform(fileToJobLaunchRequestTransformer())         
            .handle(jobLaunchingGw())
            .handle(logger())
            .get();
}

public class FileToJobLaunchRequestTransformer implements GenericTransformer<Message<File>, JobLaunchRequest> {

    private final static Logger log = LoggerFactory.getLogger(FileToJobLaunchRequestTransformer.class);

    @Autowired
    @Qualifier("importOffersJob")
    private Job job;

    @Override
    public JobLaunchRequest transform(Message<File> source) {
        log.info("FileToJobLaunchRequestTransformer, source.getPayload().getAbsolutePath(): {}", source.getPayload()
                .getAbsolutePath());

        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
        jobParametersBuilder.addString("pathToFile", "file:" + source.getPayload().getAbsolutePath());
        //jobParametersBuilder.addString("UUID", UUID.randomUUID().toString());

        JobParameters jobParams = job.getJobParametersIncrementer().getNext(jobParametersBuilder.toJobParameters());

        return new JobLaunchRequest(job, jobParams);
        }
     }

如果我取消注释jobParametersBuilder.addString("UUID", UUID.randomUUID().toString());它正在工作,但我认为重点是能够重用我的作业配置中定义的增量器,不是吗?

(当我在没有集成的情况下运行与简单的 Spring Boot 相同的批次时,增量器正在工作)

非常感谢


RunIdIncrementer.getNext():

public JobParameters getNext(JobParameters parameters) {

    JobParameters params = (parameters == null) ? new JobParameters() : parameters;

    long id = params.getLong(key, 0L) + 1;
    return new JobParametersBuilder(params).addLong(key, id).toJobParameters();
}

您每次都会创建一个新的作业参数,因此它将始终返回 1,因为run.id不存在。

如果您移动jobParametersBuilder到变压器中的磁场,旁边job,它会起作用,但仅适用于您的应用程序的此实例化。下次启动应用程序时,它将再次从 1 开始。

为了在重新启动后继续存在,您需要保存run.id某处的值,或者您需要从存储库中获取最后的作业参数。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring Batch 集成 Java DSL 和 RunIdIncrementer 不递增 的相关文章

随机推荐