0

我可以在春季批处理中将 FlatfileItemReader 与 Taskexecutor 一起使用吗?

我已经用 ThreadPoolTask​​Executor 实现了 FlatFileItemReader。当我在 ItemProcessor 中打印记录时,我没有得到一致的结果,即并非所有记录都被打印,有时其中一条记录被打印多次。它让我知道 FlatFileItemReader 不是线程安全的,而且它在 spring 文档中也这么说,但我看到一些博客说可以将 FlatFileItemReader 与 Task Executor 一起使用。

所以我的问题是:无论如何都可以将 FlatfileItemReader 与 Task Executor 一起使用吗?

    @Bean
    @StepScope
    public FlatFileItemReader<DataLifeCycleEvent> csvFileReader(
            @Value("#{stepExecution}") StepExecution stepExecution) {

        Resource inputResource;
        FlatFileItemReader<DataLifeCycleEvent> itemReader = new FlatFileItemReader<>();

        itemReader.setLineMapper(new OnboardingLineMapper(stepExecution));
        itemReader.setLinesToSkip(1);
        itemReader.setSaveState(false);
        itemReader.setSkippedLinesCallback(new OnboardingHeaderMapper(stepExecution));
        String inputResourceString = stepExecution.getJobParameters().getString("inputResource");
        inputResource = new FileSystemResource(inputFileLocation + ApplicationConstant.SLASH + inputResourceString);
        itemReader.setResource(inputResource);
        stepExecution.getJobExecution().getExecutionContext().putInt(ApplicationConstant.ERROR_COUNT, 0);
        return itemReader;
    }
4

2 回答 2

1

FlatFileItemReaderextendsAbstractItemCountingItemStreamItemReader不是线程安全的。所以如果你在多线程步骤中使用它,你需要同步它。

你可以把它包在一个SynchronizedItemStreamReader. 这是一个简单的例子:

@Bean
public SynchronizedItemStreamReader<DataLifeCycleEvent> itemReader() {
    FlatFileItemReader<DataLifeCycleEvent> itemReader = ... // your item reader

    SynchronizedItemStreamReader<DataLifeCycleEvent> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
    synchronizedItemStreamReader.setDelegate(itemReader);
    return synchronizedItemStreamReader;
}
于 2020-05-26T16:08:54.257 回答
0

这个方法给出了这个异常:-java.lang.ClassCastException: com.sun.proxy.$Proxy344 cannot be cast to org.springframework.batch.item.support.SynchronizedItemStreamReader

@Bean
public SynchronizedItemStreamReader<DataLifeCycleEvent> itemReader() {
    FlatFileItemReader<DataLifeCycleEvent> itemReader = ... // your item reader

    SynchronizedItemStreamReader<DataLifeCycleEvent> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
    synchronizedItemStreamReader.setDelegate(itemReader);
    return synchronizedItemStreamReader;
}
于 2021-08-17T15:44:24.450 回答