1

我正在使用 Apache Beam(Java SDK)使用批量加载方法(文件加载)在 BigQuery 中插入记录。我想检索那些在插入过程中失败的记录。

是否可以对失败的记录制定重试策略?

下面是我的代码:

public static void insertToBigQueryDataLake(
        final PCollectionTuple dataStoresCollectionTuple,
        final TupleTag<KV<DataLake, PayloadSpecs>> dataLakeValidTag,
        final Long loadJobTriggerFrequency,
        final Integer loadJobNumShard) {


    WriteResult writeResult = dataStoresCollectionTuple
            .get(dataLakeValidTag)
            .apply(TRANSFORMATION_NAME, DataLakeTableProcessor.dataLakeTableProcessorTransform())
            .apply(
                    WRITING_EVENTS_NAME,
                    BigQueryIO.<KV<DataLake, TableRowSpecs>>write()
                            .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
                            .withTriggeringFrequency(Duration.standardMinutes(loadJobTriggerFrequency))
                            .withNumFileShards(loadJobNumShard)
                            .to(new DynamicTableRowDestinations<>(IS_DATA_LAKE))
                            .withFormatFunction(BigQueryServiceImpl::dataLakeTableRow));

    writeResult.getFailedInserts().apply(ParDo.of(new DoFn<TableRow, Void>() {
        @ProcessElement
        public void processElement(final ProcessContext processContext) throws IOException {
            System.out.println("Table Row : " + processContext.element().toPrettyString());
        }
    }));

}
4

1 回答 1

0

使用 getFailedInsertsWithErr() 方法,我们可以将失败的插入推送到另一个表以执行根本原因分析 (RCA),请查看此处了解更多详细信息。

Example:
// write failed rows with their error to error table                
writeResult
        .getFailedInsertsWithErr()
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(5))))
        .apply("BQ-insert-error-extract", ParDo.of(new BigQueryInsertErrorExtractFn(tableRowToInsertView)).withSideInputs(tableRowToInsertView))
        .apply("BQ-insert-error-write", BigQueryIO.writeTableRows()
                .to(errTableSpec)
                .withJsonSchema(errSchema)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
于 2021-07-20T04:36:33.237 回答