我在一个主题中有 1000 个分区。我想让一个线程从一个主题的一个分区中读取,转换消息并写入另一个主题。我正在添加多线程以获得更好的吞吐量。我正在尝试使用 reactor-Kafka 来实现这一点 - https://projectreactor.io/docs/kafka/1.3.5-SNAPSHOT/reference/index.html#_introduction 我的理解是在反应堆中,每个接收器都有自己的单线程调度程序,所以我必须创建 1000 个接收器来实现上述场景。我一直在寻找这方面的例子,但我找不到任何例子,我也无法弄清楚如何做到这一点。
这是我从一个主题中的所有分区读取、转换消息并写入另一个主题的代码。
static class ReactiveTransposeAndSend extends SetKafkaProperties {
SenderOptions<Integer, String> senderOptions =
SenderOptions.<Integer, String>create(producerProps)
.maxInFlight(1024);
KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions);
ReceiverOptions<Integer, String> receiverOptions =
ReceiverOptions.<Integer, String>create(consumerProps)
.subscription(Collections.singleton(SOURCE_TOPIC));
ReactiveTransposeAndSend(Map<String, Object> consumerPropsOverride, Map<String, Object> producerPropsOverride, String bootstrapServers, String sourceTopic, String destTopic) {
super(consumerPropsOverride, producerPropsOverride, bootstrapServers, sourceTopic, destTopic);
}
public Disposable ReadProcessWriteRecords() {
Scheduler writerScheduler = Schedulers.newBoundedElastic(60, 60, "writerThreads");
Scheduler readerScheduler = Schedulers.newBoundedElastic(60, 60, "readerThreads");
return KafkaReceiver.create(receiverOptions)
.receive()
.doOnNext( r -> System.out.printf("Record received: " + r.value() + " in thread: " + Thread.currentThread().getName() + System.lineSeparator()))
.map(m -> SenderRecord.create(processRecord(m),m.receiverOffset()))
.as(sender::send)
.doOnNext(m->m.correlationMetadata().acknowledge())
.doOnError(e -> e.printStackTrace())
.subscribe();
}
private ProducerRecord<Integer, String> processRecord( ReceiverRecord<Integer, String> message) {
System.out.printf( "Processing record " + message.value() + " in thread: "
+ Thread.currentThread().getName() + System.lineSeparator()) ;
return new ProducerRecord<Integer,String>(DESTINATION_TOPIC, message.key(), message.value()+ " updated");
}
}
如果有人能给我建议或指出示例以让多个接收者使用来自多个分区的消息,我将不胜感激。
更新代码:
static class ReactiveConsumeTransposeAndSend extends SetKafkaProperties {
SenderOptions<Integer, String> senderOptions =
SenderOptions.<Integer, String>create(producerProps)
.maxInFlight(1024);
KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions);
ReceiverOptions<Integer, String> receiverOptions =
ReceiverOptions.<Integer, String>create(consumerProps)
.subscription(Collections.singleton(SOURCE_TOPIC))
.addAssignListener(partitions -> {
System.out.printf("Partitions assigned" + partitions + System.lineSeparator());})
.addRevokeListener(partitions -> {
System.out.printf("Partitions assigned" + partitions + System.lineSeparator());})
;
ReactiveConsumeTransposeAndSend(Map<String, Object> consumerPropsOverride, Map<String, Object> producerPropsOverride, String bootstrapServers, String sourceTopic, String destTopic) {
super(consumerPropsOverride, producerPropsOverride, bootstrapServers, sourceTopic, destTopic);
}
public Disposable ReadProcessWriteRecords() {
Scheduler writerScheduler = Schedulers.newBoundedElastic(60, 60, "writerThreads");
Scheduler readerScheduler = Schedulers.newBoundedElastic(60, 60, "readerThreads");
return KafkaReceiver.create(receiverOptions)
.receive()
.doOnNext( r -> System.out.printf("Record received: " + r.value() + " from partition: " + r.partition() + " in thread: " + Thread.currentThread().getName() + System.lineSeparator()))
.map(m -> SenderRecord.create(processRecord(m),m.receiverOffset()))
.as(sender::send)
.doOnNext(m->m.correlationMetadata().acknowledge())
.doOnError(e -> e.printStackTrace())
.subscribe();
}
private ProducerRecord<Integer, String> processRecord( ReceiverRecord<Integer, String> message) {
System.out.printf( "Processing record " + message.value() + " in thread: "
+ Thread.currentThread().getName() + System.lineSeparator()) ;
return new ProducerRecord<Integer,String>(DESTINATION_TOPIC, message.key(), message.value()+ " updated");
}
}
根据@nipuna 的建议,我更新了SampleConsumer.java中的示例代码。但是,这些是我在运行应用程序时收到的打印语句:
Partitions assigned[metrics-2, metrics-1, metrics-0]
Record received: A16 from partition: 2 in thread: reactive-kafka-reactive-group-1
Processing record A16 in thread: reactive-kafka-reactive-group-1
Record received: B14 from partition: 1 in thread: reactive-kafka-reactive-group-1
Processing record B14 in thread: reactive-kafka-reactive-group-1
因此,相同的线程(“eactive-kafka-reactive-group-1”)被用于使用来自分区的消息。我想让不同的线程消耗来自不同分区的消息。