10

我想知道如何确定ItemWriterSpring Batch 当前是处于块处理模式还是后备单项处理模式。首先,我没有找到如何实现这种回退机制的信息。

即使我还没有找到解决实际问题的方法,我也想与您分享我对回退机制的了解。

如果我错过任何内容,请随时添加带有其他信息的答案;-)

4

2 回答 2

16

跳过机制的实现可以在FaultTolerantChunkProcessorRetryTemplate中找到。

假设您配置了可跳过的异常但没有可重试的异常。当前块中有一个失败的项目导致异常。

现在,首先要写入整个块。在处理器的write()方法中,您可以看到,RetryTemplate调用了 a。它还获得对 aRetryCallback和 a 的两个引用RecoveryCallback

切换到RetryTemplate. 找到以下方法:

protected <T> T doExecute(RetryCallback<T> retryCallback, RecoveryCallback<T> recoveryCallback, RetryState state)

在那里你可以看到RetryTemplate只要它没有用尽(即在我们的配置中恰好一次),它就会重试。这种重试将由可重试异常引起。不可重试的异常会立即中止这里的重试机制。

在重试用尽或中止后,RecoveryCallback将调用:

e = handleRetryExhausted(recoveryCallback, context, state);

这就是单项处理模式现在开始的地方!

RecoveryCallback(在处理器的write()方法中定义!)将锁定输入块(inputs.setBusy(true))并运行它的scan()方法。在那里你可以看到,一个项目是从块中取出的:

List<O> items = Collections.singletonList(outputIterator.next());

如果该单个项目可以被ItemWriter正确处理,则该块将完成并且ChunkOrientedTasklet将运行另一个块(用于下一个单个项目)。这将导致对 的常规调用RetryCallback,但由于该块已被 锁定,因此将立即调用RecoveryTemplate该方法:scan()

if (!inputs.isBusy()) {
    // ...
}
else {
    scan(contribution, inputs, outputs, chunkMonitor);
}

因此将处理另一个单个项目并重复此过程,直到原始块被逐项处理:

if (outputs.isEmpty()) {
    inputs.setBusy(false);

而已。我希望你觉得这很有帮助。而且我更希望您可以通过搜索引擎轻松找到它,并且不要浪费太多时间,自己找到它。;-)

于 2013-05-15T14:12:40.757 回答
1

解决我原来的问题的一种可能方法(ItemWriter 想知道,它是在块模式还是单项模式)可能是以下替代方案之一:


  • 只有当传递的块大小为 1 时,才需要进行任何进一步的检查
  • 当传递的块是 ajava.util.Collections.SingletonList时,我们会很确定,因为FaultTolerantChunkProcessor它执行以下操作:

    列表项 = Collections.singletonList(outputIterator.next());

    不幸的是,这个类是私有的,所以我们不能用instanceOf.

  • 相反,如果块是一个ArrayList我们也可以确定,因为 Spring Batch 的Chunk类使用它:

    私有列表项 = new ArrayList();

  • 一个模糊的地方是从执行上下文中读取的缓冲项。但我希望那些也是 ArrayLists。

反正我还是觉得这个方法太模糊了。我宁愿让框架提供这些信息。


另一种方法是将 my 挂接ItemWriter到框架执行中。也许ItemWriteListener.onWriteError()是合适的。

更新:onWriteError()如果您处于单项模式并在ItemWriter. 我认为这是一个提交的错误:https ://jira.springsource.org/browse/BATCH-2027

所以这个替代方案退出了。


这是一个无需任何框架即可直接在编写器中执行相同操作的片段

    private int writeErrorCount = 0;

@Override
public void write(final List<? extends Long> items) throws Exception {
    try {
        writeWhatever(items);
    } catch (final Exception e) {
        if (this.writeErrorCount == 0) {
            this.writeErrorCount = items.size();
        } else {
            this.writeErrorCount--;
        }

        throw e;
    }
    this.writeErrorCount--;
}

public boolean isWriterInSingleItemMode() {
    return writeErrorCount != 0;
}

注意:人们应该在这里检查可跳过的异常,而不是Exception一般情况下。

于 2013-05-15T14:39:02.430 回答