0

我正在使用带有 Spring Integration 的 Project Reactor 从 Kafka 读取并写入 MongoDB,我的 Kafka 使用效果很好,但是.handle(MongoDb.reactiveOutboundChannelAdapter(mongoFactory))卡住了。我已经看到这个函数的内部代码是new ReactiveMongoDbStoringMessageHandler(mongoFactory)),所以我尝试了以下方法(我有一个transform()从 转换为 的方法ConsumerRecordMono<String>带有@Transformer注释):

    public IntegrationFlows writeToMongo() {
         return IntegrationFlows.from(kafkaChannel)
              .transform(this)
              .handle(new ReactiveMongoDbStoringMessageHandler(mongoFactory))
              .get();
    }

代码遵循文档https://docs.spring.io/spring-integration/reference/html/mongodb.html#mongodb-reactive-channel-adapters。我得到的错误是: java.lang.IllegalArgumentException: Found ambiguous parameter type [class java.lang.Void] for method match:然后是很长的函数列表。有什么理由会发生这种情况?

4

1 回答 1

0

如果您不new ReactiveMongoDbStoringMessageHandler(mongoFactory)打算订阅返回的Mono. A.handle(MongoDb.reactiveOutboundChannelAdapter(mongoFactory))是正确的做法,因为它将其包装ReactiveMongoDbStoringMessageHandlerReactiveMessageHandlerAdapter自动订阅中。

但是我认为您真正的问题在于.transform(this). 我相信您在这个类中有很多方法,因此请更具体地使用方法名称。而这与 Project Reactor 无关。不确定为什么要Mono在发送之前尝试转换为ReactiveMongoDbStoringMessageHandler...您可能在提供有效负载(ConsumerRecord?)时遇到问题,该有效负载不是用于保存到集合中的 MongoDB 映射实体。

于 2022-01-18T15:39:02.717 回答