1

TL;博士; KafkaReceiver如果我想并行处理来自不同主题的消息并且来自不同主题的消息处理时间不同,是否需要每个主题的实例?

我正在尝试 1 个KafkaReceiver订阅 5 个主题,每个主题有 1 个分区。

来自特定主题的消息的处理速度可能会变慢。为了避免让消费者暂停,因为处理来自其他主题的消息可以正常工作,我将每个主题的消息分组并在单独的线程上进行处理。它看起来像这样:

...
scheduler = Schedulers.newParallel("P", 5);
...
flux.log()
    .groupBy(m -> m.receiverOffset().topicPartition())
    .flatMap(partitionFlux -> partitionFlux.publishOn(scheduler)
        .map(this::transform)
        .map(this::process)
    .doOnError(throwable -> {
        log.error("Ups!", throwable);
    })
    .subscribe();

我想防止的是通过不断消耗消息来填充内存,我不确定何时KafkaConsumer#poll触发。

由于我是新手,reactor-kafka所以我猜测在记录poll时会发生这种情况request(NNN)...,并且调度程序池中的线程似乎正在调用它:

[            P-4] reactor.Flux.UsingWhen.2                 : request(256)
[            P-4] reactor.Flux.UsingWhen.2                 : onNext(ConsumerRecord...
[            P-4] reactor.Flux.UsingWhen.2                 : onNext(ConsumerRecord...
[            P-4] reactor.Flux.UsingWhen.2                 : onNext(ConsumerRecord...

我在这里假设处理线程(P-4)说“我可以处理更多!!!”。

有时我会得到:

...
[r-coordinator-3] reactor.Flux.UsingWhen.2                 : onNext(ConsumerRecord(...
[r-coordinator-3] reactor.Flux.UsingWhen.2                 : onNext(ConsumerRecord(...
[r-coordinator-3] reactor.Flux.UsingWhen.2                 : onNext(ConsumerRecord(...
...

r-coordinator-3之前用于处理的线程在哪里.publishOn被添加到管道中。

  1. poll记录时会发生吗request(NNN)...
  2. 为什么有时P-4r-coordinator-3记录onNext事件?
  3. 如果在处理线程(例如,准备好进一步处理消息)poll时调用,是否会从所有主题中获取消息并最终导致内存不足错误?在“1 KafkaReceiver - 5 个主题”示例中,线程可能正在快速处理并且它可能会频繁调用,而如果轮询从所有主题中获取并且它们很慢,则其他线程会填满它们的队列。P-4pollP-4poll
  4. 在这个用例中,我唯一的选择是KafkaReceiver按主题使用吗?
4

0 回答 0