1

我正在尝试使用基于分区的排序同时读取消息。我参考了下面的示例,并添加了延迟(10 秒)来模拟工作负载。但我注意到即使它运行多个线程,它也不会同时运行。它只运行一条消息。

我的主题由 20 个分区组成,因此我希望至少并行运行 20 个线程并等待 10 秒(我放的延迟),然后一次又一次地运行。

在此处输入链接描述

    private fun onPayment(it: ReceiverRecord<String, Any>): Mono<ReceiverRecord<String, Any>> {
    println(
        "SMF message [${Thread.currentThread().name}], Offset -  ${
        it.receiverOffset().offset()
        }  , Partition ${
        it.receiverOffset().topicPartition()
        },  Value - ${it.value()}  ${SimpleDateFormat("hh:mm:ss").format(Date())}"
    )
    return Mono.just(it).delayElement(Duration.ofMillis(10000))
}

fun doSubscribe(consumer: (ReceiverRecord<String, Any>) -> Mono<ReceiverRecord<String, Any>>) {
    val reOption = KafkaConfig().kafkaConsumerConfig().subscription(Collections.singleton("payTopic"))
        .commitInterval(Duration.ZERO)
        .commitBatchSize(0)
    val receiver = KafkaReceiver.create(reOption)

    Flux.defer(receiver::receive).groupBy { m ->
        m.receiverOffset().topicPartition()
    }.flatMap { grpFlux ->
        grpFlux.publishOn(schd).concatMap { m ->
            consumer(m).map {
                m.receiverOffset().commit()
            }
        }
    }.subscribe()
}

它一个一个地依次打印,并且只从一个分区中消耗。

在此处输入图像描述

4

0 回答 0