我有一个 spring boot/integration/batch,它将在 SFTP 上运行和轮询文件。
我希望能够最终使用相同的参数(基本上相同的文件)重新启动作业(可能是因为应用程序已重新启动,或者因为某些原因我们再次收到相同的文件)RunIdIncrementer
在作业配置中定义。
不幸的是 run.id=1 不会增加,我得到一个JobInstanceAlreadyCompleteException
作业配置
@Autowired
private JobBuilderFactory jobBuilders;
@Bean
public Job importOffersJob() {
Job job = jobBuilders.get("importOffersJob")
.start(importOffersStep)
.listener(traceJobExecutionListener())
.incrementer(new RunIdIncrementer())
.build();
return job;
}
一体化
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(Sftp.inboundAdapter(SftpSessionFactory())
.regexFilter(".*\\.xml.mini$")
.deleteRemoteFiles(intCfg.getSftpDeleteRemoteFiles())
.preserveTimestamp(Boolean.TRUE)
.autoCreateLocalDirectory(Boolean.TRUE)
.remoteDirectory(intCfg.getSftpRemoteDirectory())
.localDirectory(new File(intCfg.getSftpLocalDirectory())
),
e -> e.id("sftpInboundAdapter")
.poller(Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES).maxMessagesPerPoll(1)))
.transform(fileToJobLaunchRequestTransformer())
.handle(jobLaunchingGw())
.handle(logger())
.get();
}
public class FileToJobLaunchRequestTransformer implements GenericTransformer<Message<File>, JobLaunchRequest> {
private final static Logger log = LoggerFactory.getLogger(FileToJobLaunchRequestTransformer.class);
@Autowired
@Qualifier("importOffersJob")
private Job job;
@Override
public JobLaunchRequest transform(Message<File> source) {
log.info("FileToJobLaunchRequestTransformer, source.getPayload().getAbsolutePath(): {}", source.getPayload()
.getAbsolutePath());
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString("pathToFile", "file:" + source.getPayload().getAbsolutePath());
//jobParametersBuilder.addString("UUID", UUID.randomUUID().toString());
JobParameters jobParams = job.getJobParametersIncrementer().getNext(jobParametersBuilder.toJobParameters());
return new JobLaunchRequest(job, jobParams);
}
}
如果我取消注释jobParametersBuilder.addString("UUID", UUID.randomUUID().toString());
它正在工作,但我认为重点是能够重用我的作业配置中定义的增量器,不是吗?
(当我在没有集成的情况下运行与简单的 Spring Boot 相同的批次时,增量器正在工作)
非常感谢