0

有人可以帮我用线程池重写这个流程吗?以下代码有效,但使用固定延迟来服务传入文件:

@Bean
public IntegrationFlow sampleFlow() {
    return IntegrationFlows
            .from(fileReadingMessageSource(), c -> c.poller(Pollers.fixedDelay(500)))
            .channel(new DirectChannel())
            .transform(fileMessageToJobRequest())
            .handle(springBatchJobLauncher())
            .handle(jobExecution -> {
                logger.info("jobExecution payload: {}", jobExecution.getPayload());
            })
            .get();
}

需要线程,因为文件的速度很快。

4

2 回答 2

1

谢谢@Artem。我根据 Artem 的回答找到了解决方案。诀窍是在下面的代码中使用 TaskExecutor。此外,应将 Pollers.maxMessagesPerPoll(nbfiles) 设置为一次处理多个消息(=文件)。

  @Bean
  public IntegrationFlow sampleFlow() throws InterruptedException {
    return IntegrationFlows
          .from(fileReadingMessageSource(), c -> c.poller(Pollers.fixedDelay(5000).maxMessagesPerPoll(5)))
          .channel(MessageChannels.executor(threadPoolTaskExecutor()))
          .transform(fileMessageToJobRequest())
          .handle(springBatchJobLauncher())
          .handle(jobExecution -> {
            logger.debug("jobExecution payload: {}", jobExecution.getPayload());
          })
          .get();
  }

  @Bean
  public TaskExecutor threadPoolTaskExecutor() {
    int poolSize = 20;
    logger.debug("...... createing ThreadPool of size {}.", poolSize);
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setThreadNamePrefix("Dama_Thread_");
    executor.setMaxPoolSize(5);
    executor.setCorePoolSize(5);
    executor.setQueueCapacity(22);
    return executor;
  }
于 2018-07-03T20:25:10.290 回答
0

可以使用此选项配置轮询器:

    /**
 * Specify an {@link Executor} to perform the {@code pollingTask}.
 * @param taskExecutor the {@link Executor} to use.
 * @return the spec.
 */
public PollerSpec taskExecutor(Executor taskExecutor) {

你真的可以提供一个ThreadPoolTaskExecutor实例。

于 2018-07-03T03:06:05.553 回答