我正在尝试使用基于分区的排序同时读取消息。我参考了下面的示例,并添加了延迟(10 秒)来模拟工作负载。但我注意到即使它运行多个线程,它也不会同时运行。它只运行一条消息。
我的主题由 20 个分区组成,因此我希望至少并行运行 20 个线程并等待 10 秒(我放的延迟),然后一次又一次地运行。
private fun onPayment(it: ReceiverRecord<String, Any>): Mono<ReceiverRecord<String, Any>> {
println(
"SMF message [${Thread.currentThread().name}], Offset - ${
it.receiverOffset().offset()
} , Partition ${
it.receiverOffset().topicPartition()
}, Value - ${it.value()} ${SimpleDateFormat("hh:mm:ss").format(Date())}"
)
return Mono.just(it).delayElement(Duration.ofMillis(10000))
}
fun doSubscribe(consumer: (ReceiverRecord<String, Any>) -> Mono<ReceiverRecord<String, Any>>) {
val reOption = KafkaConfig().kafkaConsumerConfig().subscription(Collections.singleton("payTopic"))
.commitInterval(Duration.ZERO)
.commitBatchSize(0)
val receiver = KafkaReceiver.create(reOption)
Flux.defer(receiver::receive).groupBy { m ->
m.receiverOffset().topicPartition()
}.flatMap { grpFlux ->
grpFlux.publishOn(schd).concatMap { m ->
consumer(m).map {
m.receiverOffset().commit()
}
}
}.subscribe()
}
它一个一个地依次打印,并且只从一个分区中消耗。