0

我正在使用 Alpakka kafka 连接器来消耗来自 kafka 的数据包。我使用 Consumer 作为 CommittableSource。我想在一台机器上创建多个消费者线程并将它们用作单一来源。我怎样才能做到这一点?

目前,我使用 Consumer.CommittableSource 创建了多个源,并使用“合并”功能将所有源合并为一个源。但我不确定这是否是正确的方法,因为我没有创建线程。

请在下面找到我当前使用的源代码:

public Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control> source() {
Source finalSource = Source.empty();
        for (int index = 0; index < consumerConfig.getNoOfConsumers(); index++) {
            finalSource = finalSource.merge(Consumer.committableSource(consumerSettings, subscription));
        }
return finalSource;
}
4

1 回答 1

0

是什么让您相信您需要更多线程?更多时候,您希望跨多个流共享单个 Kafka 消费者客户端实例。

您不应该将多个Consumer.committableSources 中的元素合并到一个流中,它不适用于批量提交。

多次运行相同的流设置会解决您的需求吗?

于 2019-05-06T08:49:17.513 回答