我使用 SPRING BATCH 读取了一个固定长度的输入文件。我已经实现了 Job、Step、Processor 等。下面是示例代码。
@Configuration
public class BatchConfig {
private JobBuilderFactory jobBuilderFactory;
private StepBuilderFactory stepBuilderFactory;
@Value("${inputFile}")
private Resource resource;
@Autowired
public BatchConfig(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public Job job() {
return this.jobBuilderFactory.get("JOB-Load")
.start(fileReadingStep())
.build();
}
@Bean
public Step fileReadingStep() {
return stepBuilderFactory.get("File-Read-Step1")
.<Employee,EmpOutput>chunk(1000)
.reader(itemReader())
.processor(new CustomFileProcesser())
.writer(new CustomFileWriter())
.faultTolerant()
.skipPolicy(skipPolicy())
.build();
}
@Bean
public FlatFileItemReader<Employee> itemReader() {
FlatFileItemReader<Employee> flatFileItemReader = new FlatFileItemReader<Employee>();
flatFileItemReader.setResource(resource);
flatFileItemReader.setName("File-Reader");
flatFileItemReader.setLineMapper(LineMapper());
return flatFileItemReader;
}
@Bean
public LineMapper<Employee> LineMapper() {
DefaultLineMapper<Employee> defaultLineMapper = new DefaultLineMapper<Employee>();
FixedLengthTokenizer fixedLengthTokenizer = new FixedLengthTokenizer();
fixedLengthTokenizer.setNames(new String[] { "employeeId", "employeeName", "employeeSalary" });
fixedLengthTokenizer.setColumns(new Range[] { new Range(1, 9), new Range(10, 20), new Range(20, 30)});
fixedLengthTokenizer.setStrict(false);
defaultLineMapper.setLineTokenizer(fixedLengthTokenizer);
defaultLineMapper.setFieldSetMapper(new CustomFieldSetMapper());
return defaultLineMapper;
}
@Bean
public JobSkipPolicy skipPolicy() {
return new JobSkipPolicy();
}
}
对于处理,我添加了一些我需要的示例代码,但是如果我在这里添加 BufferedReader,那么完成这项工作需要更多时间。
@Component
public class CustomFileProcesser implements ItemProcessor<Employee, EmpOutput> {
@Override
public EmpOutput process(Employee item) throws Exception {
EmpOutput emp = new EmpOutput();
emp.setEmployeeSalary(checkSal(item.getEmployeeSalary()));
return emp;
}
public String checkSal(String sal) {
// need to read the another file
// required to do some kind of validation
// after that final result need to return
File f1 = new File("C:\\Users\\John\\New\\salary.txt");
FileReader fr;
try {
fr = new FileReader(f1);
BufferedReader br = new BufferedReader(fr);
String s = br.readLine();
while (s != null) {
String value = s.substring(5, 7);
if(value.equals(sal))
sal = value;
else
sal = "5000";
s = br.readLine();
}
} catch (Exception e) {
e.printStackTrace();
}
return sal;
}
// other fields need to check by reading different different file.
// These new files contains more than 30k records.
// all are fixedlength file.
// I need to get the field by giving the index
}
在处理一个或多个字段时,我需要通过读取该文件来检查另一个文件(这是我将从 fileSystem/Cloud 读取的文件)。
在处理 5 个字段的数据时,我需要再次读取 5 个不同的不同文件,我将检查这些文件中的字段详细信息,然后生成结果,该结果将继续处理。