1

我有一个从 JobOperator#startNextInstance(String) 方法开始的 spring-batch 作业:

@Autowired
private JobOperator jobOperator;

private startMyJob() {
    jobOperator.startNextInstance("myJob");
}

使用此方法,spring-batch 会自动从实现 JobParametersIncrementer 接口的 bean 创建作业参数。我的实现添加了一些全局可用的状态信息,例如当前用户和当前时间。

现在,我想将另一个作业参数传递给仅在 startMyJob() 方法中本地可用的作业。我尝试使用 JobOperator#start(String, String) 方法:

private startMyJob() {
    jobOperator.start("myJob", "localJobParam=someLocalValue");
}

但是,现在不再调用 JobParametersIncrementer 并且缺少全局参数值。我显然可以自己调用增量器,将所有参数混合成一个参数字符串并将其传递给 JobOperator#start(String, String) 方法:

@Autowired
private JobParametersIncrementer jobParametersIncrementer;

private startMyJob() {
    JobParameters jobParameters = jobParametersIncrementer.getNext(null);
    // convert jobParameters to comma separated key=value pairs
    // add additional key=value pair with locally available data
    jobOperator.start("myJob", commaSeparatedKeyValuePairString);
}

由于 JobParameters 类不提供直接的方法来获取逗号分隔的键=值对字符串,因此该过程会产生相当长且繁琐的代码。

有没有更好的方法来启动一个作业,其中一些作业参数直接传递并且一些来自 JobIncrementer bean 的结果?

4

2 回答 2

2

我在 Spring Framework 中找不到解决方案,因此我实现了一个自定义 JobOperator,它将所有调用委托给 SimpleJobOperator,并简单地声明一个新方法startNextInstance(String jobName, String parameters),如果作业有一个自动使用 jobParameterIncrementer。

public class JobParametersJobOperator implements JobOperator, InitializingBean {

    private final Log logger = LogFactory.getLog(this.getClass());
    private SimpleJobOperator delegate;


    private ListableJobLocator jobRegistry;
    private JobExplorer jobExplorer;
    private JobLauncher jobLauncher;
    private JobRepository jobRepository;
    private JobParametersConverter jobParametersConverter = new DefaultJobParametersConverter();

    public Long startNextInstance(String jobName, String parameters) throws NoSuchJobException, JobParametersNotFoundException, JobParametersInvalidException {
        this.logger.info("Locating parameters for next instance of job with name=" + jobName);
        Job job = this.jobRegistry.getJob(jobName);
        List lastInstances = this.jobExplorer.getJobInstances(jobName, 0, 1);
        JobParametersIncrementer incrementer = job.getJobParametersIncrementer();
        JobParameters jobParameters = this.jobParametersConverter.getJobParameters(PropertiesConverter.stringToProperties(parameters));
        if(incrementer == null) {
            throw new JobParametersNotFoundException("No job parameters incrementer found for job=" + jobName);
        } else {
            jobParameters = incrementer.getNext(jobParameters);
            this.logger.info(String.format("Attempting to launch job with name=%s and parameters=%s", new Object[]{jobName, parameters}));
            try {
                return this.jobLauncher.run(job, jobParameters).getId();
            } catch (JobExecutionAlreadyRunningException var7) {
                throw new UnexpectedJobExecutionException(String.format("Illegal state (only happens on a race condition): %s with name=%s and parameters=%s", new Object[]{"job already running", jobName, parameters}), var7);
            } catch (JobRestartException var8) {
                throw new UnexpectedJobExecutionException(String.format("Illegal state (only happens on a race condition): %s with name=%s and parameters=%s", new Object[]{"job not restartable", jobName, parameters}), var8);
            } catch (JobInstanceAlreadyCompleteException var9) {
                throw new UnexpectedJobExecutionException(String.format("Illegal state (only happens on a race condition): %s with name=%s and parameters=%s", new Object[]{"job instance already complete", jobName, parameters}), var9);
            }
        }
    }

    @Override
    public List<Long> getExecutions(long l) throws NoSuchJobInstanceException {
        return delegate.getExecutions(l);
    }

    @Override
    public List<Long> getJobInstances(String s, int i, int i1) throws NoSuchJobException {
        return delegate.getJobInstances(s, i, i1);
    }

    @Override
    public Set<Long> getRunningExecutions(String s) throws NoSuchJobException {
        return delegate.getRunningExecutions(s);
    }

    @Override
    public String getParameters(long l) throws NoSuchJobExecutionException {
        return delegate.getParameters(l);
    }

    @Override
    public Long start(String s, String s1) throws NoSuchJobException, JobInstanceAlreadyExistsException, JobParametersInvalidException {
        return delegate.start(s, s1);
    }

    @Override
    public Long restart(long l) throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException, NoSuchJobException, JobRestartException, JobParametersInvalidException {
        return delegate.restart(l);
    }

    @Override
    public Long startNextInstance(String s) throws NoSuchJobException, JobParametersNotFoundException, JobRestartException, JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException, UnexpectedJobExecutionException, JobParametersInvalidException {
        return delegate.startNextInstance(s);
    }

    @Override
    public boolean stop(long l) throws NoSuchJobExecutionException, JobExecutionNotRunningException {
        return delegate.stop(l);
    }

    @Override
    public String getSummary(long l) throws NoSuchJobExecutionException {
        return delegate.getSummary(l);
    }

    @Override
    public Map<Long, String> getStepExecutionSummaries(long l) throws NoSuchJobExecutionException {
        return delegate.getStepExecutionSummaries(l);
    }

    @Override
    public Set<String> getJobNames() {
        return delegate.getJobNames();
    }

    @Override
    public JobExecution abandon(long l) throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException {
        return delegate.abandon(l);
    }

    public void setJobRegistry(ListableJobLocator jobRegistry) {
        this.jobRegistry = jobRegistry;
    }

    public void setJobExplorer(JobExplorer jobExplorer) {
        this.jobExplorer = jobExplorer;
    }

    public void setJobLauncher(JobLauncher jobLauncher) {
        this.jobLauncher = jobLauncher;
    }

    public void setJobRepository(JobRepository jobRepository) {
        this.jobRepository = jobRepository;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        delegate = new SimpleJobOperator();
        delegate.setJobRegistry(jobRegistry);
        delegate.setJobExplorer(jobExplorer);
        delegate.setJobLauncher(jobLauncher);
        delegate.setJobRepository(jobRepository);
    }
}

这是配置文件:

<bean id="jobOperator" class="com.x.y.z.JobParametersJobOperator">
    <property name="jobExplorer">
        <bean class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">
            <property name="dataSource" ref="dataSource" />
        </bean>
    </property>
    <property name="jobRepository" ref="jobRepository" />
    <property name="jobRegistry" ref="jobRegistry" />
    <property name="jobLauncher" ref="jobLauncher" />
</bean>

它可以这样调用:

public class Launcher {

    private static final Logger LOG = LoggerFactory.getLogger(Launcher.class);

    public static void main(String[] args) throws IOException, JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, NoSuchJobException, JobParametersNotFoundException, JobInstanceAlreadyExistsException {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("application-context.xml");
        JobParametersJobOperator operator = context.getBean(JobParametersJobOperator.class);
        JobParameters jobParameters = new JobParametersBuilder().addString("schedule.date", args[0]).toJobParameters();
        for (String job: operator.getJobNames()) {
            operator.startNextInstance(job, PropertiesConverter.propertiesToString(jobParameters.toProperties()));
        }
    }
}

也许这对某人有用。

于 2016-09-06T08:05:18.837 回答
0

为什么你不使用JobLauncher界面而不是JobOperator
JobLauncher.run()使用 aJobParameters而不是逗号分隔值。
也是JobOperator一个低级接口,如其 javadoc 中所述。

用于检查和控制只能访问原始类型和集合类型的作业的低级接口。适用于命令行客户端(例如,为每个操作启动一个新进程)或远程启动器(如 JMX 控制台)。

上瘾:
使用 aJobParametersBuilder建立参数然后调用JobParametersBuilder.toJobParameters(); 这样你就去掉了所有繁琐的代码。

于 2013-10-24T06:43:37.450 回答