我正在考虑在 Writer 中使用静态变量,并使用 Step Context/Job Context 将其放入 Step Listener 的 afterStep() 中。但是,当我尝试它时,我得到了空值。
此时ItemWriter可能已经被销毁,但我不确定。
这是正确的方法吗?
是的,它应该足够好。但是,您需要确保所有分区共享总行数,因为批处理运行时为每个分区维护一个StepContext克隆。你应该使用JobContext
.
我认为使用PartitionCollector和PartitionAnalyzer也是一个不错的选择。接口PartitionCollector有一个方法collectPartitionData()
来收集来自其分区的数据。收集后,批处理运行时会将这些数据传递给PartitionAnalyzer以分析数据。注意有
- 每步 N 个 PartitionCollector(每个分区 1 个)
- 每步 N StepContext(每个分区 1 个)
- 每步 1 个 PartitionAnalyzer
写入的记录可以通过StepContexttransientUserData
传递。由于StepContext是为自己的 step-partition 保留的,因此临时用户数据不会被其他分区覆盖。
这是实现:
MyItemWriter:
@Inject
private StepContext stepContext;
@Override
public void writeItems(List<Object> items) throws Exception {
// ...
Object userData = stepContext.getTransientUserData();
stepContext.setTransientUserData(partRowCount);
}
我的分区收集器
@Inject
private StepContext stepContext;
@Override
public Serializable collectPartitionData() throws Exception {
// get transient user data
Object userData = stepContext.getTransientUserData();
int partRowCount = userData != null ? (int) userData : 0;
return partRowCount;
}
我的分区分析器
private int rowCount = 0;
@Override
public void analyzeCollectorData(Serializable fromCollector) throws Exception {
rowCount += (int) fromCollector;
System.out.printf("%d rows processed (all partitions).%n", rowCount);
}
参考:JSR352 v1.0 Final Release.pdf