我正在尝试使用 Spring Integration 将数据从一个通道发送到两个不同的 Kafka 队列,这些数据在到达各自队列的途中经过不同的转换。问题是我显然有重复的生产者上下文,我不知道为什么。
这是我的流程配置:
flow -> flow
.channel(“firstChannel")
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.transform(firstTransformer::transform)
.channel(MessageChannels.queue(50))
.handle(Kafka.outboundChannelAdapter(kafkaConfig)
.addProducer(firstMetadata(), brokerAddress), e -> e.id(“firstKafkaOutboundChannelAdapter")
.autoStartup(true)
.poller(p -> p.fixedDelay(1000, TimeUnit.MILLISECONDS).receiveTimeout(0).taskExecutor(taskExecutor))
.get())
)
.subscribe(f -> f
.transform(secondTransformer::transform)
.channel(MessageChannels.queue(50))
.handle(Kafka.outboundChannelAdapter(kafkaConfig)
.addProducer(secondMetadata(), brokerAddress), e -> e.id(“secondKafkaOutboundChannelAdapter")
.autoStartup(true)
.poller(p -> p.fixedDelay(1000, TimeUnit.MILLISECONDS).receiveTimeout(0).taskExecutor(taskExecutor))
.get())
));
例外是:
无法在 bean 名称“not_specified”下注册对象 [org.springframework.integration.kafka.support.KafkaProducerContext@3163987e]:已经有对象 [org.springframework.integration.kafka.support.KafkaProducerContext@15f193b8] 绑定
我尝试过使用不同的kafkaConfig
对象,但这并没有帮助。同时,ProducerMetadata
从不同的第一个参数中可以看出,实例是不同的addProducer
。这些提供了其他元数据中相应目标队列的名称。
听起来有一些正在创建的隐式 bean 定义相互冲突。
如何用两个KafkaProducerContext
s 解决这个异常?