我正在使用 Apache Beam 的 kafkaIO 从 Confluent 模式注册表中具有 avro 模式的主题中读取数据。我能够反序列化消息并写入文件。但最终我想写信给 BigQuery。我的管道无法推断架构。如何提取/推断架构并将其附加到管道中的数据,以便我的下游进程(写入 BigQuery)可以推断架构?
这是我使用模式注册表 url 设置反序列化器以及从 Kafka 读取的代码:
consumerConfig.put(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
options.getSchemaRegistryUrl());
String schemaUrl = options.getSchemaRegistryUrl().get();
String subj = options.getSubject().get();
ConfluentSchemaRegistryDeserializerProvider<GenericRecord> valDeserializerProvider =
ConfluentSchemaRegistryDeserializerProvider.of(schemaUrl, subj);
pipeline
.apply("Read from Kafka",
KafkaIO
.<byte[], GenericRecord>read()
.withBootstrapServers(options.getKafkaBrokers().get())
.withTopics(Utils.getListFromString(options.getKafkaTopics()))
.withConsumerConfigUpdates(consumerConfig)
.withValueDeserializer(valDeserializerProvider)
.withKeyDeserializer(ByteArrayDeserializer.class)
.commitOffsetsInFinalize()
.withoutMetadata()
);
我最初认为这足以让 beam 推断架构,但它并没有因为 hasSchema() 返回 false。
任何帮助,将不胜感激。