需要一种方法来防止不需要的作业参数传播到 Spring Boot 批处理作业的下一次执行

2024-04-29

我正在使用 Spring Boot 2.1.2 和 Spring Batch 4.1.1 运行批处理应用程序。该应用程序使用 MySQL 数据库作为 Spring Batch 元数据数据源。

首先,我使用以下命令运行作业:

java -jar target/batchdemo-0.0.1-SNAPSHOT.jar -Dspring.batch.job.names=echo com.paypal.batch.batchdemo.BatchdemoApplication myparam1=value1 myparam2=value2

请注意,我传递了两个参数:

myparam1=值1 myparam2=值2

由于作业使用 RunIdIncrementer,因此应用程序使用的实际参数记录为:

作业:[SimpleJob: [name=echo]] 使用以下参数完成:[{myparam2=value2, run.id=1, myparam1=value1}]

接下来我再次运行该作业,这次删除 myparam2:

java -jar target/batchdemo-0.0.1-SNAPSHOT.jar -Dspring.batch.job.names=echo com.paypal.batch.batchdemo.BatchdemoApplication myparam1=value1

这次作业再次运行,仍包含 param2:

作业:[SimpleJob: [name=echo]] 使用以下参数完成:[{myparam2=value2, run.id=2, myparam1=value1}]

这会导致调用业务逻辑,就像我再次将 myparam2 传递给应用程序一样。

有没有办法删除作业参数并且不将其传递到下一个实例?

应用程序代码:

package com.paypal.batch.batchdemo;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
@EnableBatchProcessing
public class BatchdemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(BatchdemoApplication.class, args);
    }

    @Autowired
    JobBuilderFactory jobBuilder;

    @Autowired
    StepBuilderFactory stepBuilder;

    @Autowired
    ParamEchoTasklet paramEchoTasklet;

    @Bean
    public RunIdIncrementer incrementer() {
        return new RunIdIncrementer();
    }

    @Bean
    public Job job() {
        return jobBuilder.get("echo").incrementer(incrementer()).start(echoParamsStep()).build();
    }

    @Bean
    public Step echoParamsStep() {
        return stepBuilder.get("echoParams").tasklet(paramEchoTasklet).build();
    }
}

package com.paypal.batch.batchdemo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.stereotype.Component;

@Component
public class ParamEchoTasklet implements Tasklet {

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        LOGGER.info("ParamEchoTasklet BEGIN");
        chunkContext.getStepContext().getJobParameters().entrySet().stream().forEachOrdered((entry) -> {
            String key = entry.getKey();
            Object value = entry.getValue();
            LOGGER.info("Param {} = {}", key, value);
        });
        LOGGER.info("ParamEchoTasklet END");
        return RepeatStatus.FINISHED;
    }

    private Logger LOGGER = LoggerFactory.getLogger(ParamEchoTasklet.class);
}

我调试了 Spring Batch 和 Spring Boot 代码,这就是发生的情况。JobParametersBuilder 第 273 行 https://github.com/spring-projects/spring-batch/blob/4.1.1.RELEASE/spring-batch-core/src/main/java/org/springframework/batch/core/JobParametersBuilder.java#L273将来自最近的先前作业实例的参数以及由 JobParametersIncrementer 添加的任何参数添加到 nextParameters 映射:

List<JobExecution> previousExecutions = this.jobExplorer.getJobExecutions(lastInstances.get(0));
if (previousExecutions.isEmpty()) {
    // Normally this will not happen - an instance exists with no executions
    nextParameters = incrementer.getNext(new JobParameters());
}
else {
    JobExecution previousExecution = previousExecutions.get(0);
    nextParameters = incrementer.getNext(previousExecution.getJobParameters());
}

然后由于我使用的是 Spring Boot,JobLauncherCommandLineRunner 第 213 行 https://github.com/spring-projects/spring-boot/blob/master/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/JobLauncherCommandLineRunner.java#L213将先前的参数与为新执行传递的新参数合并,这会导致旧参数被传递给新执行:

return merge(nextParameters, jobParameters);

除非我丢失了某些东西,否则在没有参数的情况下似乎不可能再次运行该作业。这可能是春季批次的错误吗?


正常行为为RunIdIncrementer https://docs.spring.io/spring-batch/trunk/apidocs/org/springframework/batch/core/launch/support/RunIdIncrementer.html似乎增加了运行 IDJobExecution并传递剩余的优先级JobParameters https://docs.spring.io/spring-batch/trunk/apidocs/org/springframework/batch/core/JobParameters.html。我不会称这是一个错误。

请记住,背后的想法RunIdIncrementer只是更改一个标识参数以允许作业再次运行,即使先前使用相同(其他)参数的运行已成功完成并且尚未配置重新启动。

您始终可以通过实现来创建自定义增量器JobParametersIncrementer https://docs.spring.io/spring-batch/trunk/apidocs/org/springframework/batch/core/JobParametersIncrementer.html.

另一种选择是使用JobParametersBuilder https://docs.spring.io/spring-batch/4.0.x/api/org/springframework/batch/core/JobParametersBuilder.html建立一个JobParameters对象,然后使用JobLauncher https://docs.spring.io/spring-batch/trunk/apidocs/org/springframework/batch/core/launch/JobLauncher.html使用这些参数运行您的作业。如果我运行的作业具有相同的时间,我经常使用当前系统时间(以毫秒为单位)来创建唯一性JobParameters。显然,您必须弄清楚从命令行(或其他任何地方)提取特定参数并迭代它们以填充的逻辑JobParameters object.

Example:

public JobExecution executeJob(Job job) {
    JobExecution jobExecution = null;
    try {
        JobParameters jobParameters =
            new JobParametersBuilder()
                .addLong( "time.millis", System.currentTimeMillis(), true)
                .addString( "param1", "value1", true)
                .toJobParameters();
        jobExecution = jobLauncher.run( job, jobParameters );
    } catch ( JobInstanceAlreadyCompleteException | JobRestartException | JobParametersInvalidException | JobExecutionAlreadyRunningException e ) {
        e.printStackTrace();
    }
    return jobExecution;
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

需要一种方法来防止不需要的作业参数传播到 Spring Boot 批处理作业的下一次执行 的相关文章

随机推荐