0

我有以下方法:

private boolean isAllProcessedForChannel(String channel) {
    KafkaConsumer<Object, Object> consumer = clientService.getConsumer(channel);

    Uni<Map<TopicPartition, Long>> positionsUni = consumer.getPositions();
    Map<TopicPartition, Long> positions = positionsUni.await().indefinitely();
    for (Entry<TopicPartition, Long> entry : positions.entrySet()) {
      Long position = entry.getValue();
      TopicPartition partition = entry.getKey();

      Uni<Map<TopicPartition, OffsetAndMetadata>> committedUni = consumer.committed(partition);
      OffsetAndMetadata offsetAndMetadata = committedUni.await().indefinitely().get(partition);
      if (offsetAndMetadata != null) {
        long offset = offsetAndMetadata.offset();
        log.info("Offset/Position ({}): {}/{}", partition.partition(), offset, position);
        if (offset != position) {
          return false;
        }
      }
    }

    return true;
}

最后,我需要得到一个 Uni 来指示,如果其中一个分区尚未处理。我目前的解决方案是await2 个职位。我不知道如何做这整件事“真正”反应。

对于第一个 Uni 的映射中的每个条目,我需要调用一个本身返回 Uni 的方法。然后,我需要将条目(第一个 Uni)中的值与第二个 Uni 的结果进行比较。最后,我需要检查所有比较结果是否为真,并将其作为单个 Uni 返回。

有没有人暗示如何实现这一目标?还是这太复杂了,我应该坚持我的“同步”方式?

4

1 回答 1

2

您可以首先将位置条目转换为,Multi然后对于每个条目,获取一个在位置与偏移量不Uni<Boolean>同时发出的。false最终你合并结果并只取第false一个:

private Uni<Boolean> isAllProcessedForChannel(String channel) {
    return consumer.getPositions()
            .onItem().transformToMulti(positions -> Multi.createFrom().iterable(positions.entrySet()))
            .onItem().transformToUniAndMerge(entry -> {
                Long position = entry.getValue();
                TopicPartition partition = entry.getKey();

                return consumer.committed(partition).onItem().transform(committed -> {
                    OffsetAndMetadata offsetAndMetadata = committed.get(partition);
                    if (offsetAndMetadata != null) {
                        long offset = offsetAndMetadata.offset();
                        if (offset != position) {
                            return false;
                        }
                    }
                    return true;
                });
            })
            .filter(Boolean.FALSE::equals)
            .toUni();
}
于 2021-11-04T22:14:18.737 回答