我有以下方法:
private boolean isAllProcessedForChannel(String channel) {
KafkaConsumer<Object, Object> consumer = clientService.getConsumer(channel);
Uni<Map<TopicPartition, Long>> positionsUni = consumer.getPositions();
Map<TopicPartition, Long> positions = positionsUni.await().indefinitely();
for (Entry<TopicPartition, Long> entry : positions.entrySet()) {
Long position = entry.getValue();
TopicPartition partition = entry.getKey();
Uni<Map<TopicPartition, OffsetAndMetadata>> committedUni = consumer.committed(partition);
OffsetAndMetadata offsetAndMetadata = committedUni.await().indefinitely().get(partition);
if (offsetAndMetadata != null) {
long offset = offsetAndMetadata.offset();
log.info("Offset/Position ({}): {}/{}", partition.partition(), offset, position);
if (offset != position) {
return false;
}
}
}
return true;
}
最后,我需要得到一个 Uni 来指示,如果其中一个分区尚未处理。我目前的解决方案是await
2 个职位。我不知道如何做这整件事“真正”反应。
对于第一个 Uni 的映射中的每个条目,我需要调用一个本身返回 Uni 的方法。然后,我需要将条目(第一个 Uni)中的值与第二个 Uni 的结果进行比较。最后,我需要检查所有比较结果是否为真,并将其作为单个 Uni 返回。
有没有人暗示如何实现这一目标?还是这太复杂了,我应该坚持我的“同步”方式?