0

我正在开发一个示例应用程序,该应用程序将从 Kafka 主题的不同分区读取,同时处理基于分区排序的记录并将记录写入另一个主题的不同分区。这是我写的示例代码

    public class MetricsTransposer {
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    static abstract class SetKafkaProperties {
        final String SOURCE_TOPIC;
        final String DESTINATION_TOPIC;
        final Map<String, Object> consumerProps;
        final Map<String, Object> producerProps;

        SetKafkaProperties(Map<String, Object> consumerPropsOverride, Map<String, Object> producerPropsOverride, String bootstrapServers, String sourceTopic, String destTopic) {
            SOURCE_TOPIC = sourceTopic;
            DESTINATION_TOPIC = destTopic;

            consumerProps = new HashMap<String, Object>();
            consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-group-" + System.currentTimeMillis());
            consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0"); 
            if(consumerPropsOverride != null) {
                consumerProps.putAll(consumerPropsOverride);
            }

            producerProps = new HashMap<String, Object>();
            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0");
            producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, String.valueOf(Long.MAX_VALUE));
            producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            if(producerPropsOverride != null) {
                producerProps.putAll(producerPropsOverride);
            }
        }
    }

    static class ReactiveTranspose extends SetKafkaProperties {

        SenderOptions<Integer, String> senderOptions =
            SenderOptions.<Integer, String>create(producerProps)
                .maxInFlight(1024);

        KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions);

        ReceiverOptions<Integer, String> receiverOptions =
            ReceiverOptions.<Integer, String>create(consumerProps)
                .subscription(Collections.singleton(SOURCE_TOPIC));


        ReactiveTranspose(Map<String, Object> consumerPropsOverride, Map<String, Object> producerPropsOverride, String bootstrapServers, String sourceTopic, String destTopic) {
            super(consumerPropsOverride, producerPropsOverride, bootstrapServers, sourceTopic, destTopic);
        }

        public Disposable ReadProcessWriteRecords() {
            Scheduler scheduler = Schedulers.newBoundedElastic(60, 60, "writerThreads");
           return KafkaReceiver.create(receiverOptions)
                .receive()
                .doOnNext( r -> System.out.printf("Record received: {0}", r.value()))
                .groupBy(m -> m.receiverOffset().topicPartition())
                .flatMap(partitionFlux ->
                    partitionFlux.publishOn(scheduler)
                        .map(r -> processRecord(partitionFlux.key(), r))
                        .sample(Duration.ofMillis(5000))
                        .concatMap(offset -> offset.commit()))
                .subscribe();
        }

        private ReceiverOffset processRecord(TopicPartition topicPartition, ReceiverRecord<Integer, String> message) {
            System.out.printf("Processing record {} from partition {} in thread{}",
                message.value(), topicPartition, Thread.currentThread().getName());
            return message.receiverOffset();
        }
    }

    public static void RunReactiveTranformProcess(String sourceTopic, String destinationTopic) {
        ReactiveTranspose transpose = new ReactiveTranspose(null, null, BOOTSTRAP_SERVERS, sourceTopic, destinationTopic);
        transpose.ReadProcessWriteRecords();
    }

    public static void main(String[] args) throws Exception {
        String sourceTopic = "metrics";
        String destinationTopic = "cleanmetrics";

        RunReactiveTranformProcess(sourceTopic, destinationTopic);

    }
}

当我运行应用程序时,我没有在控制台中看到打印语句。我确实有要在该主题中使用的数据。所以我想知道代码是否与主题有关。我正在寻求帮助,以了解如何检查它是否连接到主题并阅读消息或这里可能出现的问题。

我是 Java、反应式编程和 Kafka 的新手。这是一个自学项目,我很可能遗漏了一些简单明了的东西。

更多信息:这是我的日志的快照。我有一个名为 metrics 的主题,有 3 个分区 在此处输入图像描述

更新:我没有看到我的打印语句,因为我的主题中有数据,但 auto.offset.reset 设置为最新。将其更改为最早消耗现有数据。

4

1 回答 1

2

你的问题在这里:

public void ReadProcessWriteRecords() {
    Scheduler scheduler = Schedulers.newBoundedElastic(60, 60, "writerThreads");

    // Here you are ignoring the return
    // Nothing happens until you subscribe
    // So this is merly a statement not a execution.
    KafkaReceiver.create(receiverOptions)
                 .receive()
                 .doOnNext( r -> System.out.printf("Record received: {0}", r.value()))
                 .groupBy(m -> m.receiverOffset().topicPartition())
                 .flatMap(partitionFlux ->
                     partitionFlux.publishOn(scheduler)
                         .map(r -> processRecord(partitionFlux.key(), r))
                         .sample(Duration.ofMillis(5000))
                         .concatMap(offset -> offset.commit()));
}

反应式文档在入门部分中对此进行了介绍nothing happens until you subscribe。在上面的代码中,您正在创建一个反应流,但是没有人订阅它。

由于您的应用程序是流的使用者,您应该在subscribe某处添加一条语句。

我个人不会返回 void (您通常会尽量避免在反应式编程中使用 void 函数,因为这些通常会导致副作用并且难以测试),我会producer一直返回到 main 函数,以便可以对代码进行单元测试.

这样生成的主要功能看起来像这样。

public static void main(String[] args) throws Exception {
    String sourceTopic = "metrics";
    String destinationTopic = "cleanmetrics";

    RunReactiveTranformProcess(sourceTopic, destinationTopic).subscribe();

}
于 2021-07-31T11:03:22.433 回答