4

因此,我在写入数据库的步骤中有 2 个分区。我想记录每个分区写入的行数,得到总和,打印到日志;

我正在考虑static在 Writer 中使用一个变量,并使用 Step Context/Job Context 将其放入afterStep()Step Listener。但是,当我尝试它时,我得到了null. 我能够在close()阅读器中获取这些值。

这是正确的方法吗?或者我应该使用 Partition Collector/Reducer/Analyzer?

我在 Websphere Liberty 中使用 java 批处理。我正在 Eclipse 中开发。

4

2 回答 2

5

我正在考虑在 Writer 中使用静态变量,并使用 Step Context/Job Context 将其放入 Step Listener 的 afterStep() 中。但是,当我尝试它时,我得到了空值。

此时ItemWriter可能已经被销毁,但我不确定。

这是正确的方法吗?

是的,它应该足够好。但是,您需要确保所有分区共享总行数,因为批处理运行时为每个分区维护一个StepContext克隆。你应该使用JobContext.

我认为使用PartitionCollectorPartitionAnalyzer也是一个不错的选择。接口PartitionCollector有一个方法collectPartitionData()来收集来自其分区的数据。收集后,批处理运行时会将这些数据传递给PartitionAnalyzer以分析数据。注意有

  • 每步 N 个 PartitionCollector(每个分区 1 个)
  • 每步 N StepContext(每个分区 1 个)
  • 每步 1 个 PartitionAnalyzer

写入的记录可以通过StepContexttransientUserData传递。由于StepContext是为自己的 step-partition 保留的,因此临时用户数据不会被其他分区覆盖。


这是实现:

MyItemWriter

@Inject
private StepContext stepContext;

@Override
public void writeItems(List<Object> items) throws Exception {
    // ...
    Object userData = stepContext.getTransientUserData();
    stepContext.setTransientUserData(partRowCount);
}

我的分区收集器

@Inject
private StepContext stepContext;

@Override
public Serializable collectPartitionData() throws Exception {

    // get transient user data
    Object userData = stepContext.getTransientUserData();
    int partRowCount = userData != null ? (int) userData : 0;
    return partRowCount;
}

我的分区分析器

private int rowCount = 0;

@Override
public void analyzeCollectorData(Serializable fromCollector) throws Exception {
    rowCount += (int) fromCollector;
    System.out.printf("%d rows processed (all partitions).%n", rowCount);
}

参考:JSR352 v1.0 Final Release.pdf

于 2016-06-19T14:51:04.733 回答
3

让我对已接受的答案提供一些替代方案并添加一些评论。

PartitionAnalyzer 变体 - 使用 analyzeStatus() 方法

另一种技术是使用analyzeStatus仅在每个整个分区结束时调用,并传递分区级退出状态。

public void analyzeStatus(BatchStatus batchStatus, String exitStatus) 

相比之下,上面的答案 usinganalyzeCollectorData在每个分区的每个块的末尾被调用。

例如

public class MyItemWriteListener extends AbstractItemWriteListener {

@Inject
StepContext stepCtx;

@Override
public void afterWrite(List<Object> items) throws Exception {
    // update 'newCount' based on items.size()
    stepCtx.setExitStatus(Integer.toString(newCount));
}

显然,这仅在您没有将退出状态用于其他目的时才有效。您可以设置任何工件的退出状态(尽管这种自由可能是另外一件需要跟踪的事情)。

评论

该 API 旨在促进跨 JVM 分派单个分区的实现,(例如,在 Liberty 中,您可以在此处看到这一点。)但是使用静态将您绑定到单个 JVM,因此不推荐使用这种方法。

另请注意,JobContextStepContext都是以我们在批处理中看到的类似“线程本地”的方式实现的。

于 2016-06-20T15:49:28.167 回答