0

我为应用程序上注册的每个队列创建了一个按需 ChannelAdapter、AsyncTaskExecutor 和一个 Channel。我注意到当maxPoolSizeAsyncTaskExecutor 的数量等于 1 时,消息没有被处理。这就是创建 AsyncTaskExecutor bean 的方式。

 static void registerAsyncTaskExecutor(final Consumer consumer, final GenericApplicationContext registry) {
        final TaskExecutor executor = consumer.getExecutor();

        final BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ThreadPoolTaskExecutor.class);
        builder.addPropertyValue("corePoolSize", executor.getCorePoolSize());
        builder.addPropertyValue("maxPoolSize", executor.getMaxPoolSize());
        builder.addPropertyValue("threadNamePrefix", consumer.getName() + "-");

        final String beanName = executor.getName();
        final BeanDefinition beanDefinition = builder.getBeanDefinition();
        registry.registerBeanDefinition(beanName, beanDefinition);
    }

我注意到的另一件事是,当调用此方法时,java.util.concurrent.ThreadPoolExecutor#execute此条件workerCountOf(c) < corePoolSize始终为 false。完整的项目链接在这里https://github.com/LeoFuso/spring-integration-aws-demo

4

1 回答 1

1

只为某个可管理的组件提供一个只有一个线程的线程池总是不好的做法。您可能不知道该组件将如何处理您的线程池,并且您的单线程可能被内部的某个长期存在的任务占用,而所有新任务都将停止在队列中等待那个单线程是免费的,这可能不会发生。

事实上,这正是我们所使用的AsynchronousMessageListenerSpring Cloud AWS 所拥有的,上面提到的SqsMessageDrivenChannelAdapter

public void run() {
        while (isQueueRunning()) {

所以,或者依赖默认的执行器或者提供足够的线程到你自己的。

看起来那里的逻辑对于线程数是这样的:

    int spinningThreads = this.getRegisteredQueues().size();

    if (spinningThreads > 0) {
        threadPoolTaskExecutor
                .setCorePoolSize(spinningThreads * DEFAULT_WORKER_THREADS);

所以,我们有确切的线程数,因为我们提供了 SQS 队列,加上2工人的乘数。看起来我们需要一个线程来为每个队列轮询和额外的线程来处理来自它们的消息。

(虽然不是 Spring Integration 问题——更像是 Spring Cloud AWS)。

于 2020-09-11T14:45:38.940 回答