1

我正在使用 Spring Batch 编写多个报告。要求是我将获得带有 BranchId 和名称的记录。我需要为每个 branchId 创建一个文件,并将相应的数据连同一些页眉和页脚写入该文件。

例子:

Student A = new Student("A",1);
        Student B = new Student("B",2);
        Student C = new Student("C",1);
        Student D = new Student("D",4);

在这种情况下,它应该创建总共 3 个文件

 file1-->1.txt(with A,C)
    file2-->2.txt(with B)
    file3-->4.txt(with D))

.

我正在使用 ClassifierCompositeItemWriter 根据数据(在本例中为 id)创建/重用 FlatFileItemWriter 并能够成功创建文件。对于页眉和页脚 - 在编写器级别使用回调。生成的文件只有 HEADER 和 DATA。但不知何故 FOOTER 根本没有被执行。

看起来像在页脚之前关闭文件的一些问题或使用 STEP SCOPE 的问题

有人可以帮我打电话给 FOOTER。

这是代码。

   

     import java.util.Arrays;
        import java.util.HashMap;
        import java.util.Map;
        
        import org.springframework.batch.core.Job;
        import org.springframework.batch.core.JobParameters;
        import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
        import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
        import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
        import org.springframework.batch.core.configuration.annotation.StepScope;
        import org.springframework.batch.core.launch.JobLauncher;
        import org.springframework.batch.item.ExecutionContext;
        import org.springframework.batch.item.ItemReader;
        import org.springframework.batch.item.ItemWriter;
        import org.springframework.batch.item.file.FlatFileItemWriter;
        import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
        import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
        import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
        import org.springframework.batch.item.support.ListItemReader;
        import org.springframework.classify.Classifier;
        import org.springframework.context.ApplicationContext;
        import org.springframework.context.annotation.AnnotationConfigApplicationContext;
        import org.springframework.context.annotation.Bean;
        import org.springframework.context.annotation.Configuration;
        import org.springframework.core.io.FileSystemResource;
        
        @Configuration
        @EnableBatchProcessing
        public class MyJob3 {
        
            public static void main(String[] args) throws Exception {
                ApplicationContext context = new AnnotationConfigApplicationContext(MyJob3.class);
                JobLauncher jobLauncher = context.getBean(JobLauncher.class);
                Job job = context.getBean(Job.class);
                jobLauncher.run(job, new JobParameters());
            }
        
            @Bean
            public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
                return jobs.get("job").start(steps.get("step").<Student, Student>chunk(5)
                        .reader(itemReader())
                        .writer(getStudentItemWriter(itemWriterClassifier()))
                        .build())
                        .build();
            }
        
            @Bean
            @StepScope
            public ItemReader<Student> itemReader() {
                Student A = new Student("A", 1);
                Student B = new Student("B", 2);
                Student C = new Student("C", 1);
                Student D = new Student("D", 4);
                Student E = new Student("E", 4); 
        
                return new ListItemReader<>(Arrays.asList(A,B,C,D,E));
            }
        
            Map<Integer, FlatFileItemWriter<Student>> map = new HashMap<>();
        
            @Bean
            @StepScope
            public ClassifierCompositeItemWriter<Student> getStudentItemWriter(Classifier<Student, ItemWriter<? super Student>> classifier) {
                ClassifierCompositeItemWriter<Student> compositeItemWriter = new ClassifierCompositeItemWriter<>();
                compositeItemWriter.setClassifier(classifier);
                return compositeItemWriter;
            }
        
            @Bean
            @StepScope
            public Classifier<Student, ItemWriter<? super Student>> itemWriterClassifier() {
                return student -> {
        
                    System.out.println("Branch Id ::" + student.getBranchId() + " and Student Name" + student.getName());
                    if (map.containsKey(student.getBranchId())) {
                        FlatFileItemWriter<Student> result = map.get(student.getBranchId());
                        System.out.println("Exising Writer object ::" + result);
                        return result;
                    }
                    
                    String fileName ="Branch_Info_" + student.getBranchId() + ".txt";
                    BeanWrapperFieldExtractor<Student> fieldExtractor = new BeanWrapperFieldExtractor<>();
                    fieldExtractor.setNames(new String[] { "branchId", "name" });
                    DelimitedLineAggregator<Student> lineAggregator = new DelimitedLineAggregator<>();
                    lineAggregator.setFieldExtractor(fieldExtractor);
        
                    FlatFileItemWriter<Student> flatFileItemWriter = new FlatFileItemWriter<>();
                    flatFileItemWriter.setResource(new FileSystemResource(fileName));
                    flatFileItemWriter.setAppendAllowed(true);
                    flatFileItemWriter.setLineAggregator(lineAggregator);
                    System.out.println("Writing header...");
                    flatFileItemWriter.setHeaderCallback(writer -> writer.write("Header"));
                    System.out.println("Writing Footer...");
                    flatFileItemWriter.setFooterCallback(writer -> writer.write("Footer"));
                    System.out.println("Writing done...");
                    flatFileItemWriter.open(new ExecutionContext());
        
                    map.put(student.getBranchId(), flatFileItemWriter);
        
                    System.out.println("New Writer object ::" + flatFileItemWriter);
                    return flatFileItemWriter;
                };
            }
        }

4

1 回答 1

1

就我而言,我没有固定数量的编写器(在您的情况下为 foo 和 boo),它们将是动态的,需要在 RUN 时创建。有关如何执行此操作并将它们注册为步骤的任何建议?

在这种情况下,您需要:

  1. select distinct(id) from table根据您的输入数据,使用例如查询或类似机制预先计算可能的不同值(在您的情况下为 1、2 和 4)
  2. 动态创建ItemWriterbean 并将它们注册为您的步骤中的流。

以下是基于您的用例的示例:给定不同组的学生列表,想法是根据他们的组将它们写入不同的文件中。这是一个预先计算不同组并在应用程序上下文中动态创建/注册项目编写器的 tasklet:

import java.io.IOException;
import java.io.Writer;
import java.util.List;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.file.FlatFileFooterCallback;
import org.springframework.batch.item.file.FlatFileHeaderCallback;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.transform.PassThroughLineAggregator;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.JdbcTemplate;

class DynamicWritersConfigurationTasklet implements Tasklet {

    private JdbcTemplate jdbcTemplate;
    private ConfigurableApplicationContext applicationContext;

    public DynamicWritersConfigurationTasklet(JdbcTemplate jdbcTemplate, ConfigurableApplicationContext applicationContext) {
        this.jdbcTemplate = jdbcTemplate;
        this.applicationContext = applicationContext;
    }

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
        ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
        BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;

        String sql = "select distinct(groupId) from student";
        List<Integer> groups = jdbcTemplate.queryForList(sql, Integer.class);
        for (Integer group : groups) {
            String name = "group" + group + "Writer";
            GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
            beanDefinition.setBeanClassName(FlatFileItemWriter.class.getName());
            MutablePropertyValues propertyValues = new MutablePropertyValues();
            propertyValues.addPropertyValue("name", name);
            propertyValues.addPropertyValue("lineAggregator", new PassThroughLineAggregator<>());
            propertyValues.addPropertyValue("resource", new FileSystemResource(group + ".txt"));
            propertyValues.addPropertyValue("headerCallback", (FlatFileHeaderCallback) writer -> writer.write("header"));
            propertyValues.addPropertyValue("footerCallback", (FlatFileFooterCallback) writer -> writer.write("footer"));
            beanDefinition.setPropertyValues(propertyValues);
            registry.registerBeanDefinition(name, beanDefinition);
        }
        return RepeatStatus.FINISHED;
    }

}

一旦到位,第二步在运行时从应用程序上下文加载这些项目编写器,并将它们注册为委托ClassifierCompositeItemWriter

@Bean
@StepScope
public ClassifierCompositeItemWriter<Student> itemWriter(ConfigurableApplicationContext applicationContext) {
    // dynamically get writers from the application context and register them as delegates in the composite
    Map<String, FlatFileItemWriter> beansOfType = applicationContext.getBeansOfType(FlatFileItemWriter.class);
    // Classify students by group
    Classifier<Student, FlatFileItemWriter<Student>> classifier = student -> beansOfType.get("group" + student.getGroupId() + "Writer");
    return new ClassifierCompositeItemWriterBuilder()
            .classifier(classifier)
            .build();
}

@Bean
@JobScope
public Step step2(StepBuilderFactory stepBuilderFactory, ConfigurableApplicationContext applicationContext, DataSource dataSource) {
    SimpleStepBuilder<Student, Student> step2 = stepBuilderFactory.get("readWriteStudents")
            .<Student, Student>chunk(2)
            .reader(itemReader(dataSource))
            .writer(itemWriter(applicationContext));

    // register writers as streams in the step so that open/update/close are called correctly
    Map<String, FlatFileItemWriter> beansOfType = applicationContext.getBeansOfType(FlatFileItemWriter.class);
    for (FlatFileItemWriter flatFileItemWriter : beansOfType.values()) {
        step2.stream(flatFileItemWriter);
    }

    return step2.build();
}

我在这里有一个完整的例子:SO67604628 的示例应用程序。请参阅本指南以了解如何签出单个文件夹(如果您不想克隆整个 repo)。该示例生成 3 个文件,其中学生按 groupId 分组。请注意如何正确生成页眉/页脚,因为委托编写者在步骤中注册为流。

于 2021-05-21T10:40:06.097 回答