2

我正在读取一个多格式文件,使用FlatFileItemReader并将每一行映射到其相应的 bean 类型ItemProcessor并执行数据丰富。但是当我尝试使用 将这些记录写入文件FlatFileItemWriter时,我无法BeanWrapperFieldExtractor为不同的记录类型单独分配。我该如何解决这个问题?

输入文件格式

1#9999999#00001#2#RecordType1
2#00002#June#Statement#2020#9#RecordType2
3#7777777#RecordType3

预期的输出文件格式

1#9999999#00001#2#RecordType1#mobilenumber1
2#00002#June#Statement#2020#9#RecordType2#mobilenumber2
3#7777777#RecordType3#mobilenumber3

项目处理器

public class RecordTypeItemProcessor implements ItemProcessor<RecordType, RecordType> {

    @Override
    public RecordType process(RecordType recordType) throws Exception {
        if (recordType instanceof RecordType1) {
            RecordType1 recordType1 = (RecordType1) recordType;
            //enrichment logic
            return recordType1;
        } else if (recordType instanceof RecordType2) {
            RecordType2 recordType2 = (RecordType2) recordType;
            //enrichment logic
            return recordType2;
        }
        else
            return null;
    }
}```

4

2 回答 2

0

你可以使用 aClassifierCompositeItemProcessor和 a ClassifierCompositeItemWriter。这个想法是根据项目的类型使用 a 对项目进行分类Classifier并调用相应的项目处理器/编写器。instance of这种方法比在处理器/写入器中使用多个检查更清洁 IMO 。

有一些内置的Classifier实现(我认为对你来说SubclassClassifier是一个不错的选择),但如果需要,你可以创建自己的。

您可以在ClassifierCompositeItemProcessorTestsClassifierCompositeItemWriterTests中找到示例。

于 2020-12-11T08:21:44.203 回答
0

Mahmoud 建议的解决方案终于奏效了 :) 但是,有一些警告。ClassifierCompositeItemWriter旨在写入不同的文件。即,如果有 3 种不同的记录类型,将有 3 种不同的输出文件。但在我的用例中,我希望以相同的顺序在单个文件中输出。所以,我在每个 Writer bean 中提到了相同的文件名并添加了 `writer.setAppendAllowed(true); 但在此之后,不同的记录类型被合并在一起,排序顺序发生了变化。因此,我将块大小从 50 减少到 3。3 是记录类型的总数。但是,这将对性能产生影响。通过这样的一些调整,我终于得到了想要的输出。这是我的实现(仅供参考,需要更多清理)

Configuration.java (StepBuilderFactory)

...chunk(3).reader(reader).processor(processor()).writer(writer).stream(recordType1FlatFileItemWriter()).stream(recordType2FlatFileItemWriter()).build();

处理器步骤

@Bean
@StepScope
public ItemProcessor processor() {
            ClassifierCompositeItemProcessor<? extends RecordType, ? extends RecordType> processor = new ClassifierCompositeItemProcessor<>();
            ItemProcessor<RecordType1, RecordType1> recordType1Processor = new ItemProcessor<RecordType1, RecordType1>() {
                @Nullable
                @Override
                public RecordType1 process(RecordType1 recordType1) throws Exception {
                    //Processing logic
                    return recordType1;
                }
            };

            ItemProcessor<RecordType2, RecordType2> recordType2Processor = new ItemProcessor<RecordType2, RecordType2>() {
                @Nullable
                @Override
                public RecordType2 process(RecordType2 recordType2) throws Exception {
                    //Processing logic
                    return recordType2;
                }
            };

        SubclassClassifier classifier = new SubclassClassifier();
        Map typeMap = new HashMap();
        typeMap.put(RecordType1.class, recordType1Processor);
        typeMap.put(RecordType2.class, recordType2Processor);
        classifier.setTypeMap(typeMap);
        processor.setClassifier(classifier);
        return processor;
    }

作家步骤

    @Bean
    @StepScope
    public ClassifierCompositeItemWriter writer(@Value("#{stepExecutionContext[fileName]}") Resource file) throws Exception {
        ClassifierCompositeItemWriter<String> writer = new ClassifierCompositeItemWriter<>();
        SubclassClassifier classifier = new SubclassClassifier<>();
        Map typeMap = new HashMap<>();
        typeMap.put(RecordType1.class, recordType1FlatFileItemWriter());
        typeMap.put(RecordType2.class, recordType2FlatFileItemWriter());
        typeMap.put(RecordType3.class, recordType3FlatFileItemWriter());
        classifier.setTypeMap(typeMap);
        writer.setClassifier(classifier);
        return writer;
    }

作家

    @Bean
    public FlatFileItemWriter<RecordType1> recordType1FlatFileItemWriter() throws Exception{
        FlatFileItemWriter<RecordType1> writer = new FlatFileItemWriter<>();
        writer.setResource( new FileSystemResource(outputFolder + "test.txt"));
        writer.setAppendAllowed(true);
        writer.setLineAggregator(new DelimitedLineAggregator<RecordType1>() {{
            setDelimiter("#");
            setFieldExtractor(new BeanWrapperFieldExtractor<RecordType1>() {
                {
                    setNames(new String[] { "RecordType", "ID1", "ID2", "ID3");
                }
            });
        }});
        return  writer;
    }

    @Bean
    public FlatFileItemWriter<RecordType2> recordType2FlatFileItemWriter() throws Exception{
        FlatFileItemWriter<RecordType2> writer = new FlatFileItemWriter<>();
        writer.setResource( new FileSystemResource(outputFolder + "test.txt"));
        writer.setAppendAllowed(true);
        writer.setLineAggregator(new DelimitedLineAggregator<RecordType2>() {{
            setDelimiter("#");
            setFieldExtractor(new BeanWrapperFieldExtractor<RecordType2>() {
                {
                    setNames(new String[] { "RecordType", "ID9", "ID8",);
                }
            });
        }});
        return  writer;
    }
于 2020-12-14T03:14:13.823 回答