0

我有一个场景,我需要大约 50-60 个不同的进程同时运行并执行一个任务。

每个进程都必须使用 sql 查询从数据库中获取数据,方法是传递一个值并获取要在后续任务中运行的数据。从 table_1 中选择 col_1、col_2、col_3 其中 col_1 = :Process_1;

 @Bean
    public Job partitioningJob() throws Exception {
        return jobBuilderFactory.get("parallelJob")
                .incrementer(new RunIdIncrementer())
                .flow(masterStep())
                .end()
                .build();
    }

    @Bean
    public Step masterStep() throws Exception {
        //How to fetch data from configuration and pass all values in partitioner one by one.
        // Can we give the name for every process so that it is helpful in logs and monitoring.
        return stepBuilderFactory.get("masterStep")
                .partitioner(slaveStep())
                .partitioner("partition", partitioner())
                .gridSize(10)
                .taskExecutor(new SimpleAsyncTaskExecutor())
                .build();
    }

    @Bean
    public Partitioner partitioner() throws Exception {
        //Hit DB with sql query and fetch the data.

    }

    @Bean
    public Step slaveStep() throws Exception {
        return stepBuilderFactory.get("slaveStep")
                .<Map<String, String>, Map<String, String>>chunk(1)
                .processTask()
                .build();
    }

由于我们在 Apache Camel 中有 Aggregator 和 parallelProcessing,Spring Batch 是否有任何类似的功能可以完成相同的工作?

我是 Spring Batch 的新手,目前正在探索它是否可以处理该卷。因为这将是一个 24*7 运行的重负载应用程序,并且每个进程都需要同时运行,其中每个线程应该能够支持进程内的多个线程。

有没有办法监视这些进程,以便它无论如何都会被终止,我应该能够重新启动该特定进程?请帮助解决这个问题。

4

1 回答 1

1

请找到以上问题的答案。

  1. parallelProcessing - 本地和远程分区支持并行处理,并且可以处理大量的卷,因为我们目前每天处理 200 到 3 亿个数据。

  2. 它是否可以处理大量 - 是的,这可以处理大量并且已得到充分证明。

  3. 每个进程都需要同时运行,其中每个线程都应该能够在一个进程中支持多个线程 - Spring 批处理将根据您的 ThreadPool 进行处理。确保根据系统资源配置池。

  4. 有没有办法监控这些进程以使其终止 - 是的。分区的每个并行过程都是一个步骤,您可以在 BATCH_STEP_EXECUTION 中监控并了解所有详细信息

  5. 应该能够重新启动该特定进程 - 是的,这是一个内置功能并从失败的步骤重新启动。我们总是使用容错来处理大量作业,以便稍后处理拒绝。这也是内置功能。

下面的示例项目

https://github.com/ngecom/springBatchLocalParition/tree/master

数据库已添加 - H2 并在资源文件夹中创建可用表。我们总是更喜欢使用数据源池,池大小将大于您的线程池大小。

示例项目总结

  1. 从表“客户”中读取并划分为步骤分区
  2. 每一步分区写入新表“new_customer”
  3. JobConfiguration.java 方法名称“taskExecutor()”中可用的线程池配置
  4. slaveStep() 中可用的块大小。
  5. 您可以根据并行步骤计算内存大小并配置为 VM 最大内存。

查询帮助您在执行后根据上述问题进行分析

SELECT * FROM NEW_CUSTOMER;   
SELECT * FROM BATCH_JOB_EXECUTION bje;
SELECT * FROM BATCH_STEP_EXECUTION bse WHERE JOB_EXECUTION_ID=2; 
SELECT * FROM BATCH_STEP_EXECUTION_CONTEXT bsec WHERE STEP_EXECUTION_ID=4; 

如果要更改为 MYSQL,请添加以下数据源

spring.datasource.hikari.minimum-idle=5 
spring.datasource.hikari.maximum-pool-size=100
spring.datasource.hikari.idle-timeout=600000 
spring.datasource.hikari.max-lifetime=1800000 
spring.datasource.hikari.auto-commit=true 
spring.datasource.hikari.poolName=SpringBoot-HikariCP
spring.datasource.url=jdbc:mysql://localhost:3306/ngecomdev
spring.datasource.username=ngecom
spring.datasource.password=ngbilling

请始终参考下面的guthub URL。你会从中得到很多想法。

https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples

于 2021-02-07T13:45:59.647 回答