我有一些 Reactor Kafka 代码,它通过 a 读取事件KafkaReceiver
并通过 1 或多个KafkaSenders
连接到单个Publisher
. 一切都很好,但我想做的只是Flux
在完成时从这个连接的发送者发出一个事件(即,它已完成写入任何给定事件的所有下游主题,因此它不会为每个元素发出任何内容它向下游写入 Kafka 直到完成)。通过这种方式,我可以sample()
并定期提交偏移量,因为我知道每当发生这种sample()
情况时,我都会为传入事件提交偏移量,我已经为每个要提交偏移量的事件处理了所有下游消息。看来我可以使用pauseUntilOther()
或者then()
不知何故,但我不太清楚我的代码和具体用例是如何给出的。任何想法或建议表示赞赏,谢谢。
主要发布者代码:
this.kafkaReceiver.receive()
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux ->
partitionFlux.publishOn(this.scheduler)
.flatMap(this::processEvent)
.sample(Duration.ofMillis(10))
.concatMap(sr -> commitReceiverOffset(sr.correlationMetadata())))
.subscribe();
通过调用返回的串联 KafkaSenders processEvent()
:
return Flux.concat(kafkaSenderFluxA, kafkaSenderFluxB)
.doOnComplete(LOG.info(“Finished writing all downstream messages for incoming event);