我正在开发一个示例应用程序,该应用程序将从 Kafka 主题的不同分区读取,同时处理基于分区排序的记录并将记录写入另一个主题的不同分区。这是我写的示例代码
public class MetricsTransposer {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
static abstract class SetKafkaProperties {
final String SOURCE_TOPIC;
final String DESTINATION_TOPIC;
final Map<String, Object> consumerProps;
final Map<String, Object> producerProps;
SetKafkaProperties(Map<String, Object> consumerPropsOverride, Map<String, Object> producerPropsOverride, String bootstrapServers, String sourceTopic, String destTopic) {
SOURCE_TOPIC = sourceTopic;
DESTINATION_TOPIC = destTopic;
consumerProps = new HashMap<String, Object>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-group-" + System.currentTimeMillis());
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0");
if(consumerPropsOverride != null) {
consumerProps.putAll(consumerPropsOverride);
}
producerProps = new HashMap<String, Object>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0");
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, String.valueOf(Long.MAX_VALUE));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class);
if(producerPropsOverride != null) {
producerProps.putAll(producerPropsOverride);
}
}
}
static class ReactiveTranspose extends SetKafkaProperties {
SenderOptions<Integer, String> senderOptions =
SenderOptions.<Integer, String>create(producerProps)
.maxInFlight(1024);
KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions);
ReceiverOptions<Integer, String> receiverOptions =
ReceiverOptions.<Integer, String>create(consumerProps)
.subscription(Collections.singleton(SOURCE_TOPIC));
ReactiveTranspose(Map<String, Object> consumerPropsOverride, Map<String, Object> producerPropsOverride, String bootstrapServers, String sourceTopic, String destTopic) {
super(consumerPropsOverride, producerPropsOverride, bootstrapServers, sourceTopic, destTopic);
}
public Disposable ReadProcessWriteRecords() {
Scheduler scheduler = Schedulers.newBoundedElastic(60, 60, "writerThreads");
return KafkaReceiver.create(receiverOptions)
.receive()
.doOnNext( r -> System.out.printf("Record received: {0}", r.value()))
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux ->
partitionFlux.publishOn(scheduler)
.map(r -> processRecord(partitionFlux.key(), r))
.sample(Duration.ofMillis(5000))
.concatMap(offset -> offset.commit()))
.subscribe();
}
private ReceiverOffset processRecord(TopicPartition topicPartition, ReceiverRecord<Integer, String> message) {
System.out.printf("Processing record {} from partition {} in thread{}",
message.value(), topicPartition, Thread.currentThread().getName());
return message.receiverOffset();
}
}
public static void RunReactiveTranformProcess(String sourceTopic, String destinationTopic) {
ReactiveTranspose transpose = new ReactiveTranspose(null, null, BOOTSTRAP_SERVERS, sourceTopic, destinationTopic);
transpose.ReadProcessWriteRecords();
}
public static void main(String[] args) throws Exception {
String sourceTopic = "metrics";
String destinationTopic = "cleanmetrics";
RunReactiveTranformProcess(sourceTopic, destinationTopic);
}
}
当我运行应用程序时,我没有在控制台中看到打印语句。我确实有要在该主题中使用的数据。所以我想知道代码是否与主题有关。我正在寻求帮助,以了解如何检查它是否连接到主题并阅读消息或这里可能出现的问题。
我是 Java、反应式编程和 Kafka 的新手。这是一个自学项目,我很可能遗漏了一些简单明了的东西。
更多信息:这是我的日志的快照。我有一个名为 metrics 的主题,有 3 个分区
更新:我没有看到我的打印语句,因为我的主题中有数据,但 auto.offset.reset 设置为最新。将其更改为最早消耗现有数据。