我正在开发一个 Dataflow 管道,它正在从谷歌云存储读取一个 protobuf 文件并对其进行解析并尝试写入 BigQuery 表。没有时它工作正常。行数约为 20k 但没有。行数约为 200k,然后失败。下面是示例代码:
Pipeline pipeline = Pipeline.create(options);
PCollection<PBClass> dataCol = pipeline.apply(FileIO.match().filepattern(options.getInputFile()))
.apply(FileIO.readMatches())
.apply("Read GPB File", ParDo.of(new ParseGpbFn()));
dataCol.apply("Transform to Delta", ParDo.of(deltaSchema))
.apply(Flatten.iterables())
.apply(
BigQueryIO
//.write()
.writeTableRows()
.to(deltaSchema.tableSpec)
.withMethod(Method.STORAGE_WRITE_API)
.withSchema(schema)
//.withFormatFunction(irParDeltaSchema)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
.withExtendedErrorInfo()
)
;
尝试了以下方法的不同组合
withMethod
write
withFormatFunction
也不同。工人和不同的计算引擎类型。
每次它卡在GroupByKey
阶段并给出以下错误:
Error message from worker: java.lang.RuntimeException: Failed to create job with prefix beam_bq_job_LOAD_testjobpackage_<...>, reached max retries: 3, last failed job: null.
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJob.runJob(BigQueryHelpers.java:199)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJobManager.waitForDone(BigQueryHelpers.java:152)
org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.finishBundle(WriteTables.java:322)