1

我正在开发Spring Boot Spring Batch代码。从 中读取数据Oracle DB并将所有数据加载到MongoDB (NOSQL DB). MongoDB根据实现 mongo 关系/建模的标准方式,建模被开发为非规范化方式。

我已经TableA和他们之间的TableB表和连接表是第三表。当我通过那个时间为每个PK IdTableAB读取Table 时,我需要查询Table 以获取所有PK 并将 SubDivision Data 设置为 TableA 的模型。TableA 模型具有列表 Of 。TableAJdbcCursorItemReader<TableA>TableASubDivisionSubDivisionTableASubDivisions

我看到从中进行查询TableAProcessor并将数据设置到模型中的唯一方法TableA,它易于实现,但问题是如果我有100K TableA 记录,它会从 TableAProcess 对 DB 进行 100K 调用。

如何实现这一点并将SubDivision数据设置TableA为使用 Tasklet 或任何其他方式的模型?

如何避免从处理器调用这么多查询?

由于某些限制,我无法进行单个查询,因此我需要再向 DB 查询一个查询以获取细分数据。

@Slf4j
public class TableAProcessor implements ItemProcessor<TableA, TableA>{

    @Autowired
    private TableADao tableADao;

    @Override
    public TableA process(TableA tableA) throws Exception {
        log.debug("TableA DETAILS : "+tableA);
        List<SubDivision> subDivisions = tableADao.getSubDivision(tableA.getPKId());
        tableA.setSubDivisions(subDivisions);
        return tableA;
    }
}

模型

public class TableA {
    @Transient
    private Integer Id;
    @Field
    private String mongoId;
    ........
    .......
    @Field
    private List<SubDivision> subDivisions;
}

表ABatchConfig.java

@Configuration
public class TableABatchConfig {

    private static final String sql = "SELECT * FROM TABLEA";

    @Autowired
    @Qualifier(value="oracleDataSource")
    private DataSource dataSource;

    @Bean(destroyMethod = "")
    @StepScope
    public JdbcCursorItemReader<TableA> TableAReader() throws Exception {
        JdbcCursorItemReader<TableA> reader = new JdbcCursorItemReader<TableA>();
        reader.setDataSource(this.dataSource);
        reader.setSql(sql);

        reader.setRowMapper(new TableARowMapper());
        reader.afterPropertiesSet();
        return reader;
    }

    @Bean
    public ItemProcessor<TableA, TableA> TableAProcessor() {
        return new TableAProcessor();
    }

    @Bean
    public TableAWriter TableAWriter() {
        return new TableAWriter();
    }
}

TableAJob.java

@Configuration
@PropertySource("classpath:application.properties")
public class TableAJob {
    @Value( "${spring.chunk.size}")
    private String chunkSize;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JdbcCursorItemReader<TableA> TableAReader;

    @Autowired
    private ItemProcessor<TableA, TableA> TableAProcessor;

    @Autowired
    private TableAWriter TableAWriter;

    @Bean
    public TableAStepExecuListner TableAStepExecuListner() {
        return new TableAStepExecuListner();
    }

    @Bean("readTableAJob")
    @Primary
    public Job readTableAJob() {
        return jobBuilderFactory.get("readTableAJob")
                .incrementer(new RunIdIncrementer())
                .start(TableAStepOne())
                .build();
    }

    @Bean
    public Step TableAStepOne() {
        return stepBuilderFactory.get("TableAStepOne")
                .<TableA, TableA>chunk(Integer.parseInt(chunkSize))
                .reader(TableAReader)
                .processor(TableAProcessor)
                .writer(TableAWriter)
                .listener(TableAStepExecuListner())
                .build();
    }
}

@Service
public class TableADao {

    private static final String SQL = "COMPLEX JOIN QUERY";

    @Autowired
    private JdbcTemplate jdbcTemplate;

    public List<SubDivision> getSubDivision(Integer pkId){
        List<Map<String, Object>> results = jdbcTemplate.queryForList(SQL,new Object[] { pkId });

        List<SubDivision> divisions = new ArrayList<>();
        for (Map<String, Object> row : results) {
            divisions.add(SubDivision.builder().subDivisionCd((String)row.get("SUBDIVISION_CD"))
                    ......
                    .........
                    .........
                    ......
                    .build());
        }
        return divisions;
    }
}

TableAWriter.java

public class TableAWriter implements ItemWriter<TableA>{
    @Autowired
    private TableARepository TableARepository;

    @Override
    public void write(List<? extends TableA> items) throws Exception {
        TableARepository.saveAll(items);
    }
}
4

0 回答 0