0

我正在开发一个 Dataflow 管道,它正在从谷歌云存储读取一个 protobuf 文件并对其进行解析并尝试写入 BigQuery 表。没有时它工作正常。行数约为 20k 但没有。行数约为 200k,然后失败。下面是示例代码:

Pipeline pipeline = Pipeline.create(options);

        PCollection<PBClass> dataCol = pipeline.apply(FileIO.match().filepattern(options.getInputFile()))
                .apply(FileIO.readMatches())
                .apply("Read GPB File", ParDo.of(new ParseGpbFn()));

dataCol.apply("Transform to Delta", ParDo.of(deltaSchema))
                .apply(Flatten.iterables())
                .apply(
                        BigQueryIO
                                //.write()
                                .writeTableRows()
                                .to(deltaSchema.tableSpec)
                                .withMethod(Method.STORAGE_WRITE_API)
                                .withSchema(schema)
                                //.withFormatFunction(irParDeltaSchema)
                                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                                .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
                                .withExtendedErrorInfo()
                )
        ;

尝试了以下方法的不同组合

withMethod
write
withFormatFunction

也不同。工人和不同的计算引擎类型。

每次它卡在GroupByKey阶段并给出以下错误:

Error message from worker: java.lang.RuntimeException: Failed to create job with prefix beam_bq_job_LOAD_testjobpackage_<...>, reached max retries: 3, last failed job: null.
    org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJob.runJob(BigQueryHelpers.java:199)
    org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJobManager.waitForDone(BigQueryHelpers.java:152)
    org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.finishBundle(WriteTables.java:322)

作业步骤表视图: 在此处输入图像描述 和图表视图: 在此处输入图像描述

4

1 回答 1

0
Error message from worker: java.lang.RuntimeException: Failed to create job with prefix beam_bq_job_LOAD_testjobpackage_<...>, reached max retries: 3, last failed job: null.
            org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJob.runJob(BigQueryHelpers.java:199)
            org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJobManager.waitForDone(BigQueryHelpers.java:152)
            org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.finishBundle(WriteTables.java:322)

您收到的错误代码 - 如上所述 - 是因为在您的代码中的某处,当您指定要加载的 GCS 文件时,它的格式不正确,URI 应该看起来像这样 gs://bucket/路径/到/文件。

于 2021-10-29T16:24:27.427 回答