我正在尝试创建一个 Apache Beam 管道,我从一个 kafka 主题中读取并将其加载到 Bigquery 中。使用 Confluent 的模式注册表,我应该能够在加载到 Bigquery 时推断模式。但是,加载失败时不会推断架构。
下面是整个管道代码。
pipeline
.apply("Read from Kafka",
KafkaIO
.<byte[], GenericRecord>read()
.withBootstrapServers("broker-url:9092")
.withTopic("beam-in")
.withConsumerConfigUpdates(consumerConfig)
.withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of(schemaRegUrl, subj))
.withKeyDeserializer(ByteArrayDeserializer.class)
.commitOffsetsInFinalize()
.withoutMetadata()
)
.apply("Drop Kafka message key", Values.create())
.apply(
"Write data to BQ",
BigQueryIO
.<GenericRecord>write()
.optimizedWrites()
.useBeamSchema()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withSchemaUpdateOptions(ImmutableSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION))
.withCustomGcsTempLocation("gs://beam-tmp-load")
.withNumFileShards(10)
.withMethod(FILE_LOADS)
.withTriggeringFrequency(Utils.parseDuration("10s"))
.to(new TableReference()
.setProjectId("my-project")
.setDatasetId("loaded-data")
.setTableId("beam-load-test")
);
return pipeline.run();
运行此程序时,我收到以下错误,这是因为我正在调用 useBeamSchema() 并且 hasSchema() 返回 false:
Exception in thread "main" java.lang.IllegalArgumentException
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:2595)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:2579)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1726)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:493)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:368)
at KafkaToBigQuery.run(KafkaToBigQuery.java:159)
at KafkaToBigQuery.main(KafkaToBigQuery.java:64)