我在多线程步骤中定义了以下 FlatFileItemWriter。
public FlatFileItemWriter<School> writer() throws Exception {
FlatFileItemWriter<School> flatFileWriter = new FlatFileItemWriter<School>();
flatFileWriter.setResource(new FileSystemResource("C:\\u01\\SchoolDetails.txt"));
flatFileWriter.setName("School-File-Writer");
flatFileWriter.setAppendAllowed(true);
flatFileWriter.setLineSeparator("\n");
flatFileWriter.setHeaderCallback(writer -> writer.write(columnHeaders()));
flatFileWriter.setLineAggregator(new DelimitedLineAggregator<School>() {
{
setDelimiter("^");
setFieldExtractor((FieldExtractor<School>) schoolFieldExtractor());
}
});
return flatFileWriter;
}
private BeanWrapperFieldExtractor<School> schoolFieldExtractor() {
return new BeanWrapperFieldExtractor<School>() {
{
String[] columnValuesMapper = new String[] {
"schoolName", "schoolAddress"
};
setNames(columnValuesMapper);
}
};
}
ItemWriter 在大多数日子里生成文件。但有时它会引发以下错误:
2022-02-14 22:07:46.652 [SimpleAsyncTaskExecutor-25] INFO SpringBatchConfiguration:703 - Item Reader
2022-02-14 22:07:46.653 [SimpleAsyncTaskExecutor-25] INFO PagingItemReader:80 - reading records 1 to 10
2022-02-14 22:07:46.657 [SimpleAsyncTaskExecutor-28] INFO PagingItemReader:80 - reading records 11 to 20
2022-02-14 22:07:46.661 [SimpleAsyncTaskExecutor-27] INFO PagingItemReader:80 - reading records 21 to 30
2022-02-14 22:07:46.665 [SimpleAsyncTaskExecutor-26] INFO PagingItemReader:80 - reading records 31 to 40
2022-02-14 22:07:46.998 [SimpleAsyncTaskExecutor-25] INFO o.s.batch.core.step.AbstractStep:272 - Step: [childStep:partition1] executed in 350ms
2022-02-14 22:07:47.005 [SimpleAsyncTaskExecutor-28] INFO o.s.batch.core.step.AbstractStep:272 - Step: [childStep:partition3] executed in 357ms
2022-02-14 22:07:47.033 [SimpleAsyncTaskExecutor-27] ERROR o.s.batch.core.step.AbstractStep:237 - Encountered an error executing step childStep in School-Job-Process
org.springframework.batch.item.ItemStreamException: Output file was not created: [/u01/TotalRecordsFound-20220214.txt]
at org.springframework.batch.item.util.FileUtils.setUpOutputFile(FileUtils.java:76)
at org.springframework.batch.item.support.AbstractFileItemWriter$OutputState.initializeBufferedWriter(AbstractFileItemWriter.java:553)
at org.springframework.batch.item.support.AbstractFileItemWriter$OutputState.access$000(AbstractFileItemWriter.java:385)
at org.springframework.batch.item.support.AbstractFileItemWriter.doOpen(AbstractFileItemWriter.java:319)
at org.springframework.batch.item.support.AbstractFileItemWriter.open(AbstractFileItemWriter.java:309)
at org.springframework.batch.item.support.AbstractFileItemWriter$$FastClassBySpringCGLIB$$f2d35c3.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
at org.springframework.batch.item.file.FlatFileItemWriter$$EnhancerBySpringCGLIB$$294bdfee.open(<generated>)
at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103)
at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:311)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:205)
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:138)
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:135)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
错误间歇性发生。当两个或更多线程发生冲突以创建数据并将数据写入文件时,会发生错误。我可以通过将我的 FlatFileItemWriter 委托给 SynchronizedItemStreamWriter 来避免它。但是 Spring 文档却提出了不同的建议。文档建议在多线程步骤中使用 FlatFileItemWriter 不需要同步写入。
因此,我不确定如何避免这些错误,并且根据日志,前两个分区成功完成运行,这意味着文件已成功创建并将数据写入其中(如果存在)。那么,当前两个分区已经创建文件时,第三个分区如何告诉该文件未创建。
任何帮助,将不胜感激。提前致谢。