我需要轮询 kafka 并批量处理事件。在 Reactor kafka 中,由于它是一个蒸汽 API,我将事件作为流获取。有没有办法组合并获得固定的最大事件大小。
这就是我目前正在做的事情。
final Flux<Flux<ConsumerRecord<String, String>>> receive = KafkaReceiver.create(eventReceiverOptions)
.receiveAutoAck();
receive
.concatMap(r -> r)
.doOnEach(listSignal -> log.info("got one message"))
.map(consumerRecords -> consumerRecords.value())
.collectList()
.flatMap(strings -> {
log.info("Read messages of size {}", strings.size());
return processBulkMessage(strings)
.doOnSuccess(aBoolean -> log.info("Processed records"))
.thenReturn(strings);
}).subscribe();
但是代码只是在 collectList 之后挂起,并且永远不会到达最后一个 flatMap。
提前致谢。