0

使用来自各种来源的示例,我编写了此方法(如下所示的相关部分),其中我从 kafka 中提取 parquet-avro 消息以用于测试应用程序。根据我可以找到的代码(其中一些来自http://aseigneurin.github.io/2016/03/04/kafka-spark-avro-produce-and-sumption-avro-messages.html ),我使用的是传入的模式,而不是从消息本身中提取的模式。我是否遗漏了某些东西,或者我是否可以从每条消息中提取模式而不是需要传递它。我对这一切都很陌生,所以我想确保我以最好的方式做到这一点。


import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

....

  public List awaitAndConsumeParquet(String topic, List fieldValues, Schema avroSchema, String field, int minutesTimeout)
            throws InterruptedException {

        KafkaConsumer consumer = new KafkaConsumer(props);
        consumer.subscribe(Collections.singletonList(topic));

        List<String> foundValues = new ArrayList<>();

        long startTime = System.currentTimeMillis();
        long elapsedTime;
        while (true) {

            ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(Duration.ofMillis(1000));

            for (ConsumerRecord<String, byte[]> record : consumerRecords) {

                Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(avroSchema);
                GenericRecord genericRecord = recordInjection.invert(record.value()).get();
                String k = genericRecord.get(field).toString();
                if (fieldValues.contains(k)) {
                    foundValues.add(k);
                }
            }

            consumer.commitAsync();

...

呼叫者:


....
        TestConsumer tc = new TestConsumer();
        tc.setBootstrapServers("localhost:9092");
        tc.setKeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer");
        tc.setValueDeserializer("org.apache.kafka.common.serialization.ByteArrayDeserializer");
        tc.setGroupId("consumerGroup1");


        // await expected data - provide jsonpath to use to query expected strings from json
        List<String> notFoundMessages = tc.awaitAndConsumeParquet("demo", known_items_list, avroSchema, "known_item_key", 1);
...
4

0 回答 0