1

我正在使用带有 spring webflux SSE 流的 reactor-kafka 库来使用来自 kafka 主题的数据。当来自主题的所有消息都被消耗时,我需要返回一个特殊的 ServerSentEvent,即最大主题偏移量等于从 0 偏移量订阅时消耗的当前偏移量。以便客户知道 kafka 主题中没有更多消息。

是否可以使用 Web Flux 实现这样的目标?即,如果我说在从任何有限的元素列表中消耗每 100 个元素并通过 SSE 流作为 ServerSentEvent 发送之后,这个 SSE 流应该再获得一个事件作为 SeverSentEvent 并带有注释“已使用”。

4

2 回答 2

0

我可能是错的,但是,这对我来说没有任何意义。
Kafka 是事件流(始终开启)。
我的意思是,没有概念作为 kafka 中的最后一条消息

您可以在主题消费者端编写一些代码,因此,当某个时间没有事件到达时,触发一个动作。
但是,这并不准确,而且确实很有意义。

于 2021-06-28T21:25:10.953 回答
0

我不知道这是不是你想要的。

    @GetMapping(path = "/sse/endpoint", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Event> sse() {
        return new KafkaConsumer().receive();
    }

    public class KafkaConsumer {

        private final Sinks.Many<Event> sseEventSender = Sinks.many().multicast()
                .onBackpressureBuffer();

        public Flux<Event> receive() {
            return sseEventSender.asFlux();
        }

        public void consume() {
            if(...) {
                sseEventSender.tryEmitNext(event);
            }
        }
    }
于 2021-06-29T02:41:51.723 回答