我正在开发Spring Boot Spring Batch代码。从 中读取数据Oracle DB
并将所有数据加载到MongoDB (NOSQL DB)
. MongoDB
根据实现 mongo 关系/建模的标准方式,建模被开发为非规范化方式。
我已经TableA
和他们之间的TableB
表和连接表是第三表。当我通过那个时间为每个PK IdTableAB
读取Table 时,我需要查询Table 以获取所有PK 并将 SubDivision Data 设置为 TableA 的模型。TableA 模型具有列表 Of 。TableA
JdbcCursorItemReader<TableA>
TableA
SubDivision
SubDivision
TableA
SubDivisions
我看到从中进行查询TableAProcessor
并将数据设置到模型中的唯一方法TableA
,它易于实现,但问题是如果我有100K TableA 记录,它会从 TableAProcess 对 DB 进行 100K 调用。
如何实现这一点并将SubDivision
数据设置TableA
为使用 Tasklet 或任何其他方式的模型?
如何避免从处理器调用这么多查询?
由于某些限制,我无法进行单个查询,因此我需要再向 DB 查询一个查询以获取细分数据。
@Slf4j
public class TableAProcessor implements ItemProcessor<TableA, TableA>{
@Autowired
private TableADao tableADao;
@Override
public TableA process(TableA tableA) throws Exception {
log.debug("TableA DETAILS : "+tableA);
List<SubDivision> subDivisions = tableADao.getSubDivision(tableA.getPKId());
tableA.setSubDivisions(subDivisions);
return tableA;
}
}
模型
public class TableA {
@Transient
private Integer Id;
@Field
private String mongoId;
........
.......
@Field
private List<SubDivision> subDivisions;
}
表ABatchConfig.java
@Configuration
public class TableABatchConfig {
private static final String sql = "SELECT * FROM TABLEA";
@Autowired
@Qualifier(value="oracleDataSource")
private DataSource dataSource;
@Bean(destroyMethod = "")
@StepScope
public JdbcCursorItemReader<TableA> TableAReader() throws Exception {
JdbcCursorItemReader<TableA> reader = new JdbcCursorItemReader<TableA>();
reader.setDataSource(this.dataSource);
reader.setSql(sql);
reader.setRowMapper(new TableARowMapper());
reader.afterPropertiesSet();
return reader;
}
@Bean
public ItemProcessor<TableA, TableA> TableAProcessor() {
return new TableAProcessor();
}
@Bean
public TableAWriter TableAWriter() {
return new TableAWriter();
}
}
TableAJob.java
@Configuration
@PropertySource("classpath:application.properties")
public class TableAJob {
@Value( "${spring.chunk.size}")
private String chunkSize;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JdbcCursorItemReader<TableA> TableAReader;
@Autowired
private ItemProcessor<TableA, TableA> TableAProcessor;
@Autowired
private TableAWriter TableAWriter;
@Bean
public TableAStepExecuListner TableAStepExecuListner() {
return new TableAStepExecuListner();
}
@Bean("readTableAJob")
@Primary
public Job readTableAJob() {
return jobBuilderFactory.get("readTableAJob")
.incrementer(new RunIdIncrementer())
.start(TableAStepOne())
.build();
}
@Bean
public Step TableAStepOne() {
return stepBuilderFactory.get("TableAStepOne")
.<TableA, TableA>chunk(Integer.parseInt(chunkSize))
.reader(TableAReader)
.processor(TableAProcessor)
.writer(TableAWriter)
.listener(TableAStepExecuListner())
.build();
}
}
道
@Service
public class TableADao {
private static final String SQL = "COMPLEX JOIN QUERY";
@Autowired
private JdbcTemplate jdbcTemplate;
public List<SubDivision> getSubDivision(Integer pkId){
List<Map<String, Object>> results = jdbcTemplate.queryForList(SQL,new Object[] { pkId });
List<SubDivision> divisions = new ArrayList<>();
for (Map<String, Object> row : results) {
divisions.add(SubDivision.builder().subDivisionCd((String)row.get("SUBDIVISION_CD"))
......
.........
.........
......
.build());
}
return divisions;
}
}
TableAWriter.java
public class TableAWriter implements ItemWriter<TableA>{
@Autowired
private TableARepository TableARepository;
@Override
public void write(List<? extends TableA> items) throws Exception {
TableARepository.saveAll(items);
}
}