1

我正在使用 jsr-352 规范的 JBeret 实现。

这是我的工作配置,简而言之:

<job id="expired-customer-cancellation" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
     version="1.0" jsl-name="job-parent" parent="job-parent">
    <step id="step1" next="step2">
        <chunk item-count="#{jobParameters['chunksize']}?:3">
            <reader ref="eccReader">
            </reader>
            <writer ref="eccWriter" />
        </chunk>
        <partition>
            <mapper ref="eccMapper">
                <properties>
                    <property name="threads" value="#{jobParameters['threads']}?:3"/>
                    <property name="records" value="#{jobParameters['records']}?:30"/>
                </properties>
            </mapper>
        </partition>
    </step>
    <step id="step2">
        <batchlet ref="eccMailBatchlet" />
    </step>
</job>

Itemwriter 类做这样的事情:

@Named
public class EccWriter extends AbstractItemWriter {

    @Inject
    Logger logger;

    @Inject
    JobContext jobContext;

    @Override
    public void writeItems(List<Object> list) throws Exception {
        @SuppressWarnings("unchecked")
        ArrayList<String> processed = Optional.ofNullable(jobContext.getTransientUserData()).map(ArrayList.class::cast).orElse(new ArrayList<String>());
        list.stream().map(UserLogin.class::cast).forEach(input -> {

            if (someConditions) {

                processed.add(input.getUserId());

            }
        });

        jobContext.setTransientUserData(processed); // update job transient data with processed list
    }
}

现在我希望在step2上调用 jobContext.getTransientUserData() 时实现更新的列表,而不是我得到的只是一个值。

此外,每个分区都有自己的 jobContext.transientUserData,所以它在分区开始时总是以空值开始。

我认为 jobContext 本身可能会因其名称而误导常见错误。

在整个工作中带来一些数据的自然方式是什么?

4

1 回答 1

1

这是当前 API 中的一个差距,我同意“线程本地”行为可能会令人惊讶。

您可以使用的一种技术是改为使用步骤持久用户数据。

例如从第 1 步开始:

StepExecution.setPersistentUserData(processed);

然后从第 2 步开始:

@Inject 
JobContext ctx;

List<StepExecution> stepExecs = BatchRuntime.getJobOperator().getStepExecutions(ctx.getInstanceId());

// ... not shown here, but sort through stepExecs and find the one from step1.
StepExecution step2Exec = ... ;  

Serializable userData = step2Exec.getPersistentUserData()

之前已经注意到这是一个需要改进的领域,并且应该考虑在 Jakarta Batch(规范的新家)的未来增强中。

于 2020-02-19T11:55:00.820 回答