我有一个弹簧批处理,它将读取 CSV 文件,然后处理并将其写入另一个 CSV 文件。我想根据处理结果将结果写入两个不同的平面文件。需要将处理成功的记录写入一个文件,失败的记录写入另一个文件。
我看到几个使用“CompositeItemWriter”的例子,但没有多个“FlatfileItemWriters”的确切例子。
任何人,请分享我的用例的示例?
我有一个弹簧批处理,它将读取 CSV 文件,然后处理并将其写入另一个 CSV 文件。我想根据处理结果将结果写入两个不同的平面文件。需要将处理成功的记录写入一个文件,失败的记录写入另一个文件。
我看到几个使用“CompositeItemWriter”的例子,但没有多个“FlatfileItemWriters”的确切例子。
任何人,请分享我的用例的示例?
CompositeItemWriter
当应将有效项目写入两个或多个输出时使用。在您的情况下, aSkipListener
更合适,可用于将无效项目写入不同的文件。这是一个简单的例子:
import java.util.Arrays;
import java.util.Collections;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.SkipListenerSupport;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.item.file.transform.PassThroughLineAggregator;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
@Configuration
@EnableBatchProcessing
public class SO65996526 {
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
}
@Bean
public ItemProcessor<Integer, Integer> itemProcessor() {
return item -> {
if (item.equals(3)) {
throw new IllegalArgumentException("no 3 here!");
}
if (item.equals(7)) {
throw new IllegalArgumentException("no 7 here!");
}
return item;
};
}
@Bean
public ItemWriter<Integer> itemWriter() {
return new FlatFileItemWriterBuilder<Integer>()
.name("itemWriter")
.resource(new FileSystemResource("output.txt"))
.lineAggregator(new PassThroughLineAggregator<>())
.build();
}
@Bean
public FlatFileItemWriter<Integer> skippedItemWriter() {
return new FlatFileItemWriterBuilder<Integer>()
.name("skippedItemWriter")
.resource(new FileSystemResource("skipped.txt"))
.lineAggregator(new PassThroughLineAggregator<>())
.build();
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job")
.start(steps.get("step")
.<Integer, Integer>chunk(5)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.faultTolerant()
.skip(IllegalArgumentException.class)
.skipLimit(3)
.listener(new SkippedItemsListener(skippedItemWriter()))
.stream(skippedItemWriter()) // ensure open/close are called properly
.build())
.build();
}
static class SkippedItemsListener extends SkipListenerSupport<Integer, Integer> {
private FlatFileItemWriter<Integer> skippedItemsWriter;
public SkippedItemsListener(FlatFileItemWriter<Integer> skippedItemsWriter) {
this.skippedItemsWriter = skippedItemsWriter;
}
@Override
public void onSkipInProcess(Integer item, Throwable t) {
try {
skippedItemsWriter.write(Collections.singletonList(item));
} catch (Exception e) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Unable to write skipped item " + item);
}
}
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(SO65996526.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
此示例跳过第 3 项和第 7 项并生成两个文件:output.txt
包含有效项和skipped.txt
无效项。