1

我正在尝试使用 Spring Integration 将数据从一个通道发送到两个不同的 Kafka 队列,这些数据在到达各自队列的途中经过不同的转换。问题是我显然有重复的生产者上下文,我不知道为什么。

这是我的流程配置:

flow -> flow
        .channel(“firstChannel")
        .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                .subscribe(f -> f
                                .transform(firstTransformer::transform)
                                .channel(MessageChannels.queue(50))
                                .handle(Kafka.outboundChannelAdapter(kafkaConfig)
                                        .addProducer(firstMetadata(), brokerAddress), e -> e.id(“firstKafkaOutboundChannelAdapter")
                                        .autoStartup(true)
                                        .poller(p -> p.fixedDelay(1000, TimeUnit.MILLISECONDS).receiveTimeout(0).taskExecutor(taskExecutor))
                                        .get())
                )
                .subscribe(f -> f
                                .transform(secondTransformer::transform)
                                .channel(MessageChannels.queue(50))
                                .handle(Kafka.outboundChannelAdapter(kafkaConfig)
                                        .addProducer(secondMetadata(), brokerAddress), e -> e.id(“secondKafkaOutboundChannelAdapter")
                                        .autoStartup(true)
                                        .poller(p -> p.fixedDelay(1000, TimeUnit.MILLISECONDS).receiveTimeout(0).taskExecutor(taskExecutor))
                                        .get())
                ));

例外是:

无法在 bean 名称“not_specified”下注册对象 [org.springframework.integration.kafka.support.KafkaProducerContext@3163987e]:已经有对象 [org.springframework.integration.kafka.support.KafkaProducerContext@15f193b8] 绑定

我尝试过使用不同的kafkaConfig对象,但这并没有帮助。同时,ProducerMetadata从不同的第一个参数中可以看出,实例是不同的addProducer。这些提供了其他元数据中相应目标队列的名称。

听起来有一些正在创建的隐式 bean 定义相互冲突。

如何用两个KafkaProducerContexts 解决这个异常?

4

1 回答 1

1

您不应该.get()在这些上使用KafkaProducerMessageHandlerSpec并让框架为您制定环境。

问题是因为KafkaProducerMessageHandlerSpec implements ComponentsRegistration没有人关心:

public Collection<Object> getComponentsToRegister() {
    this.kafkaProducerContext.setProducerConfigurations(this.producerConfigurations);
    return Collections.<Object>singleton(this.kafkaProducerContext);
}

手动.get()调用后。

我同意,这会带来一些不便,我们应该为最终应用找到一些更好的解决方案,但目前还没有选择,除非遵循Spec框架组件的样式,例如Kafka.outboundChannelAdapter().

希望我清楚。

更新

好的,这绝对是我们这边的问题。我们会尽快修复它: https ://jira.spring.io/browse/INTEXT-216 https://jira.spring.io/browse/INTEXT-217

同时,您的解决方法是这样的:

 KafkaProducerContext kafkaProducerContext = (KafkaProducerContext) kafkaProducerMessageHandlerSpec.getComponentsToRegister().iterator().next();
 kafkaProducerContext.setBeanName(null);

你应该搬到哪里

Kafka.outboundChannelAdapter(kafkaConfig)
                                    .addProducer(firstMetadata(), brokerAddress)

到单独的private方法来访问它kafkaProducerContext

于 2016-01-05T22:32:50.787 回答