我试图从云存储中的一个大文件中读取并根据给定的字段对它们进行分片。
我打算阅读 | 映射(lambda x: (x[key field], x)) | GroupByKey | 使用关键字段的名称写入文件。
但是我找不到动态写入云存储的方法。是否支持此功能?
谢谢你,一清
我试图从云存储中的一个大文件中读取并根据给定的字段对它们进行分片。
我打算阅读 | 映射(lambda x: (x[key field], x)) | GroupByKey | 使用关键字段的名称写入文件。
但是我找不到动态写入云存储的方法。是否支持此功能?
谢谢你,一清
是的,您可以使用FileSystems
API创建文件。
Beam python SDK 在 2.14.0 中添加了一个实验性写入beam.io.fileio.WriteToFiles
:
my_pcollection | beam.io.fileio.WriteToFiles(
path='/my/file/path',
destination=lambda record: 'avro' if record['type'] == 'A' else 'csv',
sink=lambda dest: AvroSink() if dest == 'avro' else CsvSink(),
file_naming=beam.io.fileio.destination_prefix_naming())
可用于每条记录写入不同的文件。
您可以跳过GroupByKey
,仅用于destination
决定将每条记录写入哪个文件。的返回值destination
需要是可以分组的值。
更多文档在这里:
https://beam.apache.org/releases/pydoc/2.14.0/apache_beam.io.fileio.html#dynamic-destinations
以及这里的 JIRA 问题: