我正在从目录中读取所有文件并写入 bigquery 表。
如果目录中的任何文件有错误,它将引发错误并停止作业。我没有在日志中获得有关该文件(已引发错误的文件名)的任何信息。
with beam.Pipeline(options=pipeline_options) as p:
read_rec = p | 'Read Files' >> ReadFromText('gs://MyBucket/MyDir/*.gz')
read_str = read_rec | 'Map to Json' >> beam.Map(string_format)
write_rec = read_str | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
known_args.output,schema='string_field_0:STRING',
createdisposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
有什么方法可以跳过失败的文件并继续下一个文件,或者至少记录遇到错误的文件名。