1

我想定义一个使用 Reactor Kafka 消耗 kafka 并写入 MongoDB 的流,并且只有在成功时才会将 ID 写入 Kafka。我正在将 Project Reactor 与 Spring Integration JavaDSL 一起使用,并且我希望有一个FlowBuilder类可以在较高级别上定义我的管道。我目前有以下方向:

public IntegrationFlow buildFlow() {
   return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
      .publishSubscribeChannel(c -> c
                        .subscribe(sf -> sf
                                .handle(MongoDb.reactiveOutboundChannelAdapter())) 
      .handle(writeToKafka)
      .get();
}

我在文档中看到支持另一种方法,该方法也适用于 Project Reactor。这种方法不包括使用IntegrationFlows. 这看起来像这样:

@MessagingGateway
public static interface TestGateway {

    @Gateway(requestChannel = "promiseChannel")
    Mono<Integer> multiply(Integer value);

    }

        ...

    @ServiceActivator(inputChannel = "promiseChannel")
    public Integer multiply(Integer value) {
            return value * 2;
    }

        ...

    Flux.just("1", "2", "3", "4", "5")
            .map(Integer::parseInt)
            .flatMap(this.testGateway::multiply)
            .collectList()
            .subscribe(integers -> ...);

我想知道在使用这两个库时更推荐的处理方式是什么。我想知道如何在第二个示例中使用 Reactive MongoDB 适配器。我不确定如果没有IntegrationFlows包装器,第二种方法是否可行。

4

1 回答 1

1

专为高级最终用户 API 设计,@MessagingGateway以尽可能隐藏消息传递。因此,当您开发其逻辑时,目标服务不受任何消息传递抽象的影响。

可以使用这样的接口适配器IntegrationFlow,您应该将其视为常规服务激活器,因此它看起来像这样:

.handle("testGateway", "multiply", e -> e.async(true))

使async(true)此服务激活器订阅返回的Mono. 您可以省略它,然后您自己在下游订阅它,因为这Mono将是payload流中下一条消息的确切消息。

如果你想要相反的东西:IntegrationFlow从 中调用 an Flux,就像 that flatMap(),然后考虑使用toReactivePublisher()流定义中的运算符返回 aPublisher<?>并将其声明为 bean。在这种情况下,最好不要使用 that MongoDb.reactiveOutboundChannelAdapter(),而只是ReactiveMongoDbStoringMessageHandler让它返回Mono以传播到 that Publisher

另一方面,如果您想将其@MessagingGatewayMonoreturn 一起使用,但仍从中调用 a ReactiveMongoDbStoringMessageHandler,则将其声明为 bean 并用 that 标记@ServiceActivator

我们还可以ExpressionEvaluatingRequestHandlerAdvice在特定端点上捕获错误(或成功)并分别处理它们:https ://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#expression-建议

我认为您正在寻找的是这样的:

public IntegrationFlow buildFlow() {
   return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
      .handle(reactiveMongoDbStoringMessageHandler, "handleMessage")
      .handle(writeToKafka)
      .get();
}

注意.handle(reactiveMongoDbStoringMessageHandler)-它不是关于一个MongoDb.reactiveOutboundChannelAdapter()。因为这个包装ReactiveMessageHandler成一个ReactiveMessageHandlerAdapter自动订阅。您需要的是看起来更像是您希望将其Mono<Void>返回到您自己的控制中,因此您可以将其用作服务的输入writeToKafka并自己在那里订阅并按照您的解释处理成功或错误。关键是,使用 Reactive Stream,我们无法提供命令式错误处理。该方法与任何异步 API 使用相同。因此,我们也将错误发送到errorChannelfor Reactive Streams。

我们可能可以通过让像您这样的用例开箱即用来改进这MongoDb.reactiveOutboundChannelAdapter()一点。returnMono(true/false)

于 2022-01-21T16:37:04.563 回答