1

我在多线程步骤中定义了以下 FlatFileItemWriter。

public FlatFileItemWriter<School> writer() throws Exception {
    FlatFileItemWriter<School> flatFileWriter = new FlatFileItemWriter<School>();
    flatFileWriter.setResource(new FileSystemResource("C:\\u01\\SchoolDetails.txt"));
    flatFileWriter.setName("School-File-Writer");
    flatFileWriter.setAppendAllowed(true);
    flatFileWriter.setLineSeparator("\n");
    flatFileWriter.setHeaderCallback(writer -> writer.write(columnHeaders()));
    flatFileWriter.setLineAggregator(new DelimitedLineAggregator<School>() {
        {
            setDelimiter("^");
            setFieldExtractor((FieldExtractor<School>) schoolFieldExtractor());
        }
    });
    return flatFileWriter;
}
    
private BeanWrapperFieldExtractor<School> schoolFieldExtractor() {
    return new BeanWrapperFieldExtractor<School>() {
        {
            String[] columnValuesMapper = new String[] { 
                    "schoolName", "schoolAddress"
            };
            setNames(columnValuesMapper);
        }
    };
}

ItemWriter 在大多数日子里生成文件。但有时它会引发以下错误:

2022-02-14 22:07:46.652 [SimpleAsyncTaskExecutor-25] INFO  SpringBatchConfiguration:703 - Item Reader
2022-02-14 22:07:46.653 [SimpleAsyncTaskExecutor-25] INFO  PagingItemReader:80 - reading records 1 to 10
2022-02-14 22:07:46.657 [SimpleAsyncTaskExecutor-28] INFO  PagingItemReader:80 - reading records 11 to 20
2022-02-14 22:07:46.661 [SimpleAsyncTaskExecutor-27] INFO  PagingItemReader:80 - reading records 21 to 30
2022-02-14 22:07:46.665 [SimpleAsyncTaskExecutor-26] INFO  PagingItemReader:80 - reading records 31 to 40
2022-02-14 22:07:46.998 [SimpleAsyncTaskExecutor-25] INFO  o.s.batch.core.step.AbstractStep:272     - Step: [childStep:partition1] executed in 350ms
2022-02-14 22:07:47.005 [SimpleAsyncTaskExecutor-28] INFO  o.s.batch.core.step.AbstractStep:272     - Step: [childStep:partition3] executed in 357ms
2022-02-14 22:07:47.033 [SimpleAsyncTaskExecutor-27] ERROR o.s.batch.core.step.AbstractStep:237     - Encountered an error executing step childStep in School-Job-Process
org.springframework.batch.item.ItemStreamException: Output file was not created: [/u01/TotalRecordsFound-20220214.txt]
        at org.springframework.batch.item.util.FileUtils.setUpOutputFile(FileUtils.java:76)
        at org.springframework.batch.item.support.AbstractFileItemWriter$OutputState.initializeBufferedWriter(AbstractFileItemWriter.java:553)
        at org.springframework.batch.item.support.AbstractFileItemWriter$OutputState.access$000(AbstractFileItemWriter.java:385)
        at org.springframework.batch.item.support.AbstractFileItemWriter.doOpen(AbstractFileItemWriter.java:319)
        at org.springframework.batch.item.support.AbstractFileItemWriter.open(AbstractFileItemWriter.java:309)
        at org.springframework.batch.item.support.AbstractFileItemWriter$$FastClassBySpringCGLIB$$f2d35c3.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
        at org.springframework.batch.item.file.FlatFileItemWriter$$EnhancerBySpringCGLIB$$294bdfee.open(<generated>)
        at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103)
        at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:311)
        at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:205)
        at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:138)
        at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:135)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:834)
    

错误间歇性发生。当两个或更多线程发生冲突以创建数据并将数据写入文件时,会发生错误。我可以通过将我的 FlatFileItemWriter 委托给 SynchronizedItemStreamWriter 来避免它。但是 Spring 文档却提出了不同的建议。文档建议在多线程步骤中使用 FlatFileItemWriter 不需要同步写入。

因此,我不确定如何避免这些错误,并且根据日志,前两个分区成功完成运行,这意味着文件已成功创建并将数据写入其中(如果存在)。那么,当前两个分区已经创建文件时,第三个分区如何告诉该文件未创建。

任何帮助,将不胜感激。提前致谢。

4

0 回答 0