使用来自各种来源的示例,我编写了此方法(如下所示的相关部分),其中我从 kafka 中提取 parquet-avro 消息以用于测试应用程序。根据我可以找到的代码(其中一些来自http://aseigneurin.github.io/2016/03/04/kafka-spark-avro-produce-and-sumption-avro-messages.html ),我使用的是传入的模式,而不是从消息本身中提取的模式。我是否遗漏了某些东西,或者我是否可以从每条消息中提取模式而不是需要传递它。我对这一切都很陌生,所以我想确保我以最好的方式做到这一点。
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
....
public List awaitAndConsumeParquet(String topic, List fieldValues, Schema avroSchema, String field, int minutesTimeout)
throws InterruptedException {
KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList(topic));
List<String> foundValues = new ArrayList<>();
long startTime = System.currentTimeMillis();
long elapsedTime;
while (true) {
ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, byte[]> record : consumerRecords) {
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(avroSchema);
GenericRecord genericRecord = recordInjection.invert(record.value()).get();
String k = genericRecord.get(field).toString();
if (fieldValues.contains(k)) {
foundValues.add(k);
}
}
consumer.commitAsync();
...
呼叫者:
....
TestConsumer tc = new TestConsumer();
tc.setBootstrapServers("localhost:9092");
tc.setKeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer");
tc.setValueDeserializer("org.apache.kafka.common.serialization.ByteArrayDeserializer");
tc.setGroupId("consumerGroup1");
// await expected data - provide jsonpath to use to query expected strings from json
List<String> notFoundMessages = tc.awaitAndConsumeParquet("demo", known_items_list, avroSchema, "known_item_key", 1);
...