我正在使用 Apache Beam(Java SDK)使用批量加载方法(文件加载)在 BigQuery 中插入记录。我想检索那些在插入过程中失败的记录。
是否可以对失败的记录制定重试策略?
下面是我的代码:
public static void insertToBigQueryDataLake(
final PCollectionTuple dataStoresCollectionTuple,
final TupleTag<KV<DataLake, PayloadSpecs>> dataLakeValidTag,
final Long loadJobTriggerFrequency,
final Integer loadJobNumShard) {
WriteResult writeResult = dataStoresCollectionTuple
.get(dataLakeValidTag)
.apply(TRANSFORMATION_NAME, DataLakeTableProcessor.dataLakeTableProcessorTransform())
.apply(
WRITING_EVENTS_NAME,
BigQueryIO.<KV<DataLake, TableRowSpecs>>write()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(Duration.standardMinutes(loadJobTriggerFrequency))
.withNumFileShards(loadJobNumShard)
.to(new DynamicTableRowDestinations<>(IS_DATA_LAKE))
.withFormatFunction(BigQueryServiceImpl::dataLakeTableRow));
writeResult.getFailedInserts().apply(ParDo.of(new DoFn<TableRow, Void>() {
@ProcessElement
public void processElement(final ProcessContext processContext) throws IOException {
System.out.println("Table Row : " + processContext.element().toPrettyString());
}
}));
}