2

我熟悉如何在 Spring Batch 作业的步骤之间传递数据。但是,当您的工作由许多较小的工作组成时会发生什么?在下面的示例中,我想在第一个作业 siNotificationJob 结束时在 JobExecutionContext 中设置一些数据。然后,可以在下一个作业 ciNotificationJob 中从 StepExecutionContext 的 JobExecutionContext 读取该数据。我需要以某种方式推广这些数据吗?我似乎无法在用于配置作业参数的步骤“ciNotificationJob”中定义的作业参数提取器中看到结果。

想法?

安德鲁

    <job id="notificationJob" xmlns="http://www.springframework.org/schema/batch">

    <batch:step id="pn_step_0" next="pn-step-1">
        <batch:job ref="siNotificationJob" job-launcher="jobLauncher" 
            job-parameters-extractor="jobParamsExtractor"/>
    </batch:step>       
    <batch:step id="pn-step-1" next="pn-step-2">
        <batch:job ref="ciNotificationJob" job-launcher="jobLauncher" 
            job-parameters-extractor="jobParamsExtractor"/>
    </batch:step>           
</job>
4

2 回答 2

2

我能够解决这个问题。我将通过示例向您展示我是如何解决它的。这很复杂,但我认为最终结果相当容易理解。

我有一项名为“notificationJob”的整体工作。它有三个步骤,调用 3 个不同的作业(不是步骤)。这些作业中的每一个都可以独立运行,也可以从顶级“notificationJob”中调用。此外,每个子作业都有许多步骤。我不会在这里展示所有这些步骤,而只是想强调这些都是完整的工作本身,还有更多的步骤。

    <job id="notificationJob" xmlns="http://www.springframework.org/schema/batch">
    <batch:listeners>
        <batch:listener ref="pn_job-parent-listener" />
    </batch:listeners>
    <batch:step id="pn_step-0" next="pn-step-1">
        <batch:job ref="siNotificationJob" job-launcher="jobLauncher" 
            job-parameters-extractor="futureSiParamsExtractor"/>
    </batch:step>
    <batch:step id="pn-step-1" next="pn-step-2">
        <batch:job ref="ciNotificationJob" job-launcher="jobLauncher" 
            job-parameters-extractor="futureCiParamsExtractor"/>
    </batch:step>
    <batch:step id="pn-step-2">
        <batch:job ref="combineResultsJob" job-launcher="jobLauncher" 
            job-parameters-extractor="jobParamsExtractor"/>
    </batch:step>           
</job>

关键是能够从一个工作中提取结果并在下一个工作中读取它们。现在,您可以通过多种方式做到这一点。一种方法是将一个作业的结果输出到数据库或文本文件中,然后从该文件/表中读取下一个作业。因为我没有处理那么多数据,所以我在内存中传递信息。因此,您会注意到作业参数提取器。您可以依赖参数提取器的内置实现,也可以实现自己的。我实际上两者都用。他们所做的只是从 StepExecution 中提取值,然后我们需要将它们提升/移动到下一个子作业。

    <bean id="jobParamsExtractor" class="org.springframework.batch.core.step.job.DefaultJobParametersExtractor">
    <property name="keys">
        <list>
            <value>OUTPUT</value>
        </list>
    </property>
</bean>

<bean id="futureSiParamsExtractor" class="jobs.SlideDatesParamExtractor">
    <property name="mode" value="FORWARD" />
    <property name="addedParams">
        <map><entry>
                <key><value>outputName</value></key>
                <value>FUTURE_SI_JOB_RESULTS</value>
            </entry></map>
    </property>
</bean> 

<bean id="futureCiParamsExtractor" class="jobs.SlideDatesParamExtractor">
    <property name="mode" value="FORWARD" />
    <property name="addedParams">
        <map><entry>
                <key><value>outputName</value></key>
                <value>FUTURE_CI_JOB_RESULTS</value>
            </entry></map>
    </property>
</bean>

最后,您会注意到有一个父作业侦听器。这是将状态从一项工作转移并使其可用于下一项工作的魔法。这是我对执行此操作的类的实现。

    <bean id="pn_job-state-listener" class="jobs.JobStateListener">
    <property name="parentJobListener" ref="pn_job-parent-listener" />
</bean> 

<bean id="pn_job-parent-listener" class="cjobs.ParentJobListener">
</bean>

package jobs.permnotification;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;

public class ParentJobListener implements JobExecutionListener
{

    private JobExecution parentExecution;

    @Override
    public void beforeJob(JobExecution jobExecution)
    {
        this.parentExecution = jobExecution;
    }

    @Override
    public void afterJob(JobExecution jobExecution)
    {
        // TODO Auto-generated method stub

    }

    public void setParentExecution(JobExecution parentExecution)
    {
        this.parentExecution = parentExecution;
    }

    public JobExecution getParentExecution()
    {
    return parentExecution;
    }

}


package jobs.permnotification;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;

public class JobStateListener implements JobExecutionListener
{
    private ParentJobListener parentJobListener;


    @Override
    public void beforeJob(JobExecution jobExecution)
    {
        if(parentJobListener == null || parentJobListener.getParentExecution() == null) return;
        passStateFromParentToJob(StepKey.FUTURE_SI_JOB_RESULTS.toString(), jobExecution);
        passStateFromParentToJob(StepKey.FUTURE_CI_JOB_RESULTS.toString(), jobExecution);
        passStateFromParentToJob(StepKey.OUTPUT.toString(), jobExecution);
    }

    @Override
    public void afterJob(JobExecution jobExecution)
    {
        if(parentJobListener == null || parentJobListener.getParentExecution() == null) return;
        //take state from child step and move it into the parent execution context
        passStateFromJobToParent(StepKey.FUTURE_SI_JOB_RESULTS.toString(), jobExecution);
        passStateFromJobToParent(StepKey.FUTURE_CI_JOB_RESULTS.toString(), jobExecution);
        passStateFromJobToParent(StepKey.OUTPUT.toString(), jobExecution);
    }

    private void passStateFromJobToParent(String key, JobExecution jobExecution)
    {
        Object obj = jobExecution.getExecutionContext().get(key);
        if(obj != null)
            parentJobListener.getParentExecution().getExecutionContext().put(key, obj);
    }

    private void passStateFromParentToJob(String key, JobExecution jobExecution)
    {
        Object obj = parentJobListener.getParentExecution().getExecutionContext().get(key);
        if(obj != null)
            jobExecution.getExecutionContext().put(key, obj);
    }

    public void setParentJobListener(ParentJobListener parentJobListener)
    {
        this.parentJobListener = parentJobListener;
    }

    public ParentJobListener getParentJobListener()
    {
        return parentJobListener;
    }

}
于 2014-05-09T19:00:25.390 回答
0

这有点像黑客......建议您改用弹簧集成......但看看这是否适用于您的情况。

如果您设置了 spring 批处理元数据表,如果您查询表以获取最新的作业运行,您可能可以获得在每个作业中生成的数据。您在作业执行上下文中的所有数据都被存储并且可以被查询。

春季批处理元表

于 2012-10-15T19:53:48.283 回答