2

我有大量来自 CSV 文件的消息,然后被发送到速率受限的 API。我正在使用由数据库通道消息存储支持的队列通道来使消息在处理时持久。我想尽可能接近速率限制,所以我需要跨多个线程向 API 发送消息。

在我的脑海中它应该如何工作的是读取数据库,查看可用的消息,然后将每条消息委托给要在事务中处理的线程之一。

但我无法做到这一点,我必须做的是有一个事务轮询器,它有一个由 N 个线程组成的线程池,固定速率为 5 秒,每次轮询的最大消息数为 10(一些超过 5 秒内可以处理的时间)......这工作正常,但是当等待的消息不多时会出现问题(即,如果有 10 条消息,它们将由单个线程处理)这不会是实践中的一个问题,因为我们将有 1000 条消息。但它在概念上似乎比我认为它应该如何工作更复杂。

我可能没有很好地解释这一点,但是当消息快速传入但传出较慢时,这似乎是一个常见问题?

4

1 回答 1

1

您的解决方案确实是正确的,但是您需要考虑不要将消息转换为一个,Exectuor因为这样您就会跳出事务边界。

您在同一个线程中处理了 10 条消息这一事实正是一个实现细节,它看起来像这样:

AbstractPollingEndpoint.this.taskExecutor.execute(() -> {
            int count = 0;
            while (AbstractPollingEndpoint.this.initialized
                    && (AbstractPollingEndpoint.this.maxMessagesPerPoll <= 0
                    || count < AbstractPollingEndpoint.this.maxMessagesPerPoll)) {
                try {
                    if (!Poller.this.pollingTask.call()) {
                        break;
                    }
                    count++;
                }

因此,我们轮询消息,直到maxMessagesPerPoll在同一个线程中。

为了使它真正更加并行并且仍然保持事务不要丢失您需要考虑使用的消息fixedRate

/**
 * Specify whether the periodic interval should be measured between the
 * scheduled start times rather than between actual completion times.
 * The latter, "fixed delay" behavior, is the default.
 */
public void setFixedRate(boolean fixedRate)

并增加用于TaskScheduler轮询的线程数量。您可以通过声明一个ThreadPoolTaskScheduler名称为 as 的 beanIntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME来覆盖池中的默认值 as 10。或者使用全局属性来覆盖默认的池大小TaskSchedulerhttps ://docs.spring.io/spring-integration/docs/5.0.6.RELEASE/reference/html/configuration.html#global-properties

于 2018-06-20T13:39:15.530 回答