我是 Spring Batch 的新手,在我的 Batch 中使用多个数据源时遇到了问题。
让我解释。
我在我的服务器中使用 Spring Boot 中的 2 个数据库。
到目前为止,我的 RoutingDataSource 实现一切正常。
@Component("dataSource")
public class RoutingDataSource extends AbstractRoutingDataSource {
@Autowired
@Qualifier("datasourceA")
DataSource datasourceA;
@Autowired
@Qualifier("datasourceB")
DataSource datasourceB;
@PostConstruct
public void init() {
setDefaultTargetDataSource(datasourceA);
final Map<Object, Object> map = new HashMap<>();
map.put(Database.A, datasourceA);
map.put(Database.B, datasourceB);
setTargetDataSources(map);
}
@Override
protected Object determineCurrentLookupKey() {
return DatabaseContextHolder.getDatabase();
}
}
该实现需要一个 DatabaseContextHolder,这里是:
public class DatabaseContextHolder {
private static final ThreadLocal<Database> contextHolder = new ThreadLocal<>();
public static void setDatabase(final Database dbConnection) {
contextHolder.set(dbConnection);
}
public static Database getDatabase() {
return contextHolder.get();
}
}
当我在我的服务器上收到一个请求时,我有一个基本的拦截器,它根据我在请求中的一些输入设置当前数据库。使用方法DatabaseContextHolder.setDatabase(db);
一切都适用于我的实际控制器。
当我尝试使用一个 tasklet 运行作业时,情况会变得更加复杂。
我的一个控制器启动了这样的异步任务。
@GetMapping("/batch")
public void startBatch() {
return jobLauncher.run("myJob", new JobParameters());
}
@EnableBatchProcessing
@Configuration
public class MyBatch extends DefaultBatchConfigurer {
@Autowired private JobBuilderFactory jobs;
@Autowired private StepBuilderFactory steps;
@Autowired private MyTasklet tasklet;
@Bean
public Job job(Step step) {
return jobs.get("myJob").start(step).build();
}
@Bean
protected Step registeredDeliveryTask() {
return steps.get("myTask").tasklet(tasklet).build();
}
/** Overring the joblauncher get method to make it asynchornous */
@Override
public JobLauncher getJobLauncher() {
try {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(super.getJobRepository());
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
} catch (Exception e) {
throw new BatchConfigurationException(e);
}
}
}
还有我的小任务:
@Component
public class MyTasklet implements Tasklet {
@Autowired
private UserRepository repository;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)throws Exception {
//Do stuff with the repository.
}
但是 RoutingDataSource 不起作用,即使我在开始工作之前设置了我的上下文。例如,如果我将数据库设置为 B,则 repo 将在数据库 A 上运行。它始终是选择的默认数据源。(因为这条线
setDefaultTargetDataSource(datasourceA);
)
我尝试通过在 tasklet 内传递参数中的值来设置数据库,但仍然遇到同样的问题。
@GetMapping("/batch")
public void startBatch() {
Map<String, JobParameter> parameters = new HashMap<>();
parameters.put("database", new JobParameter(DatabaseContextHolder.getCircaDatabase().toString()));
return jobLauncher.run("myJob", new JobParameters(parameters));
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)throws Exception {
String database =
chunkContext.getStepContext().getStepExecution().getJobParameters().getString("database");
DatabaseContextHolder.setDatabase(Database.valueOf(database));
//Do stuff with the repository.
}
我觉得问题是因为数据库设置在不同的线程中,因为我的工作是异步的。因此它无法在启动作业之前获取数据库集。但到目前为止我找不到任何解决方案。
问候