我正在使用 Alpakka kafka 连接器来消耗来自 kafka 的数据包。我使用 Consumer 作为 CommittableSource。我想在一台机器上创建多个消费者线程并将它们用作单一来源。我怎样才能做到这一点?
目前,我使用 Consumer.CommittableSource 创建了多个源,并使用“合并”功能将所有源合并为一个源。但我不确定这是否是正确的方法,因为我没有创建线程。
请在下面找到我当前使用的源代码:
public Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control> source() {
Source finalSource = Source.empty();
for (int index = 0; index < consumerConfig.getNoOfConsumers(); index++) {
finalSource = finalSource.merge(Consumer.committableSource(consumerSettings, subscription));
}
return finalSource;
}