1

我需要轮询 kafka 并批量处理事件。在 Reactor kafka 中,由于它是一个蒸汽 API,我将事件作为流获取。有没有办法组合并获得固定的最大事件大小。

这就是我目前正在做的事情。

final Flux<Flux<ConsumerRecord<String, String>>> receive = KafkaReceiver.create(eventReceiverOptions)
    .receiveAutoAck();
receive
    .concatMap(r -> r)
    .doOnEach(listSignal -> log.info("got one message"))
    .map(consumerRecords -> consumerRecords.value())
    .collectList()
    .flatMap(strings -> {
      log.info("Read messages of size {}", strings.size());
      return processBulkMessage(strings)
          .doOnSuccess(aBoolean -> log.info("Processed records"))
          .thenReturn(strings);
    }).subscribe();

但是代码只是在 collectList 之后挂起,并且永远不会到达最后一个 flatMap。

提前致谢。

4

1 回答 1

0

您只需对您的平原进行“展平”,.concatMap(r -> r)因此您完全消除了最初由 that 构建的批处理receiveAutoAck()。要让您processBulkMessage()处理列表流,请考虑将所有批处理逻辑移至该列表中concatMap()

.concatMap(batch -> batch
                    .doOnEach(listSignal -> log.info("got one message"))
                    .map(ConsumerRecord::value)
                    .collectList())
            .flatMap(strings -> {
于 2021-04-26T14:04:21.183 回答