6

我正在使用spring-kafka,如果我不设置ConcurrentKafkaListenerContainerFactory的并发,一切正常,当我将它设置为大于1的数字时,我得到一个异常:

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=client-3

我的配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> 
    kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new 
    ConcurrentKafkaListenerContainerFactory<String, String>();

    factory.setConcurrency(kafkaConfig.getConcurrency());

    factory.getContainerProperties().setAckMode(AckMode.MANUAL);
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

特性:

kafka.enable-auto-commit=false

kafka.client-id=client-1

kafka.concurrency=2
4

1 回答 1

7

在 github 上为此打开了一个问题client.id当前不支持为每个线程设置不同的值。

作为一种解决方法,您可以KafkaMessageListenerContainer为每个单独启动一个(这是ConcurrentMessageListenerContainer内部所做的)。

编辑

虽然不理想,但您可以省略client.id并且 kafka 客户端将为每个 (consumer-1consumer-2)生成一个

于 2017-04-25T12:00:38.080 回答