0

我是 Spring Batch 的新手,在我的 Batch 中使用多个数据源时遇到了问题。

让我解释。

我在我的服务器中使用 Spring Boot 中的 2 个数据库。

到目前为止,我的 RoutingDataSource 实现一切正常。

@Component("dataSource")
public class RoutingDataSource extends AbstractRoutingDataSource {

  @Autowired
  @Qualifier("datasourceA")
  DataSource datasourceA;

  @Autowired
  @Qualifier("datasourceB")
  DataSource datasourceB;

  @PostConstruct
  public void init() {
    setDefaultTargetDataSource(datasourceA);
    final Map<Object, Object> map = new HashMap<>();
    map.put(Database.A, datasourceA);
    map.put(Database.B, datasourceB);
    setTargetDataSources(map);
  }

  
  @Override
  protected Object determineCurrentLookupKey() {
    return DatabaseContextHolder.getDatabase();
  }
}

该实现需要一个 DatabaseContextHolder,这里是:

public class DatabaseContextHolder {
    private static final ThreadLocal<Database> contextHolder = new ThreadLocal<>();

    public static void setDatabase(final Database dbConnection) {
        contextHolder.set(dbConnection);
    }

    public static Database getDatabase() {
        return contextHolder.get();
    }
}

当我在我的服务器上收到一个请求时,我有一个基本的拦截器,它根据我在请求中的一些输入设置当前数据库。使用方法DatabaseContextHolder.setDatabase(db);一切都适用于我的实际控制器。

当我尝试使用一个 tasklet 运行作业时,情况会变得更加复杂。

我的一个控制器启动了这样的异步任务。

@GetMapping("/batch")
public void startBatch() {
  return jobLauncher.run("myJob", new JobParameters());
}

@EnableBatchProcessing
@Configuration
public class MyBatch extends DefaultBatchConfigurer {


  @Autowired private JobBuilderFactory jobs;

  @Autowired private StepBuilderFactory steps;

  @Autowired private MyTasklet tasklet;

  @Bean
  public Job job(Step step) {
    return jobs.get("myJob").start(step).build();
  }

  @Bean
  protected Step registeredDeliveryTask() {
    return steps.get("myTask").tasklet(tasklet).build();
  }

  /** Overring the joblauncher get method to make it asynchornous */
  @Override
  public JobLauncher getJobLauncher() {
    try {
      SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
      jobLauncher.setJobRepository(super.getJobRepository());
      jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
      jobLauncher.afterPropertiesSet();
      return jobLauncher;
    } catch (Exception e) {
      throw new BatchConfigurationException(e);
    }
  }
}

还有我的小任务:

@Component
public class MyTasklet implements Tasklet {

  @Autowired
  private UserRepository repository;

  @Override
  public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)throws Exception {

  //Do stuff with the repository.

  }

但是 RoutingDataSource 不起作用,即使我在开始工作之前设置了我的上下文。例如,如果我将数据库设置为 B,则 repo 将在数据库 A 上运行。它始终是选择的默认数据源。(因为这条线 setDefaultTargetDataSource(datasourceA);

我尝试通过在 tasklet 内传递参数中的值来设置数据库,但仍然遇到同样的问题。

@GetMapping("/batch")
public void startBatch() {
  Map<String, JobParameter> parameters = new HashMap<>();
  parameters.put("database", new JobParameter(DatabaseContextHolder.getCircaDatabase().toString()));
  return jobLauncher.run("myJob", new JobParameters(parameters));
}
  @Override
  public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)throws Exception {

    String database =
        chunkContext.getStepContext().getStepExecution().getJobParameters().getString("database");
    DatabaseContextHolder.setDatabase(Database.valueOf(database));
  //Do stuff with the repository.

  }

我觉得问题是因为数据库设置在不同的线程中,因为我的工作是异步的。因此它无法在启动作业之前获取数据库集。但到目前为止我找不到任何解决方案。

问候

4

1 回答 1

0

您的路由数据源正在用于 Spring Batch 的元数据,这意味着作业存储库将根据处理请求的线程与不同的数据库进行交互。这对于批处理作业是不需要的。您需要配置 Spring Batch 以使用固定数据源。

于 2020-07-15T07:56:50.000 回答