2

我试图从云存储中的一个大文件中读取并根据给定的字段对它们进行分片。

我打算阅读 | 映射(lambda x: (x[key field], x)) | GroupByKey | 使用关键字段的名称写入文件。

但是我找不到动态写入云存储的方法。是否支持此功能?

谢谢你,一清

4

2 回答 2

1

是的,您可以使用FileSystemsAPI创建文件。

于 2018-02-16T02:42:13.870 回答
1

Beam python SDK 在 2.14.0 中添加了一个实验性写入beam.io.fileio.WriteToFiles

my_pcollection | beam.io.fileio.WriteToFiles(
      path='/my/file/path',
      destination=lambda record: 'avro' if record['type'] == 'A' else 'csv',
      sink=lambda dest: AvroSink() if dest == 'avro' else CsvSink(),
      file_naming=beam.io.fileio.destination_prefix_naming())

可用于每条记录写入不同的文件。

您可以跳过GroupByKey,仅用于destination决定将每条记录写入哪个文件。的返回值destination需要是可以分组的值。

更多文档在这里:

https://beam.apache.org/releases/pydoc/2.14.0/apache_beam.io.fileio.html#dynamic-destinations

以及这里的 JIRA 问题:

https://issues.apache.org/jira/browse/BEAM-2857

于 2019-08-18T08:13:27.887 回答