1

我正在尝试创建一个 Apache Beam 管道,我从一个 kafka 主题中读取并将其加载到 Bigquery 中。使用 Confluent 的模式注册表,我应该能够在加载到 Bigquery 时推断模式。但是,加载失败时不会推断架构。

下面是整个管道代码。

    pipeline
        .apply("Read from Kafka",
                KafkaIO
                        .<byte[], GenericRecord>read()
                        .withBootstrapServers("broker-url:9092")
                        .withTopic("beam-in")
                        .withConsumerConfigUpdates(consumerConfig)
                        .withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of(schemaRegUrl, subj))
                        .withKeyDeserializer(ByteArrayDeserializer.class)
                        .commitOffsetsInFinalize()
                        .withoutMetadata()

        )
        .apply("Drop Kafka message key", Values.create())
        .apply(
                "Write data to BQ",
                BigQueryIO
                        .<GenericRecord>write()
                        .optimizedWrites()
                        .useBeamSchema()
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                        .withSchemaUpdateOptions(ImmutableSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION))
                        .withCustomGcsTempLocation("gs://beam-tmp-load")
                        .withNumFileShards(10)
                        .withMethod(FILE_LOADS)
                        .withTriggeringFrequency(Utils.parseDuration("10s"))
                        .to(new TableReference()
                                .setProjectId("my-project")
                                .setDatasetId("loaded-data")
                                .setTableId("beam-load-test")
        );
return pipeline.run();

运行此程序时,我收到以下错误,这是因为我正在调用 useBeamSchema() 并且 hasSchema() 返回 false:

Exception in thread "main" java.lang.IllegalArgumentException
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:2595)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:2579)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1726)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:493)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:368)
at KafkaToBigQuery.run(KafkaToBigQuery.java:159)
at KafkaToBigQuery.main(KafkaToBigQuery.java:64)
4

0 回答 0