2

我对 Apache Beam/Cloud Dataflow 还是新手,所以如果我的理解不正确,我深表歉意。

我正在尝试通过管道读取约 30,000 行长的数据文件。我的简单管道首先从 GCS 打开 csv,从数据中提取标头,通过 ParDo/DoFn 函数运行数据,然后将所有输出写入 csv 回 GCS。该管道有效,是我的第一个测试。

然后我编辑了管道以读取 csv,拉出标题,从数据中删除标题,通过 ParDo/DoFn 函数运行数据,并将标题作为侧输入,然后将所有输出写入 csv。唯一的新代码是将标头作为侧输入传递并从数据中过滤它。

在此处输入图像描述 在此处输入图像描述

ParDo/DoFn 函数 build_rows 只产生 context.element,这样我就可以确保我的边输入正常工作。

我得到的错误如下: 我不确定问题是什么,但我认为这可能是由于内存限制。我将示例数据从 30,000 行减少到 100 行,我的代码终于可以正常工作了。在此处输入图像描述

没有侧输入的管道确实读/写了所有 30,000 行,但最后我需要侧输入来对我的数据进行转换。

如何修复我的管道,以便我可以处理来自 GCS 的大型 csv 文件,并且仍然使用边输入作为文件的伪全局变量?

4

1 回答 1

2

I recently coded a CSV file source for Apache Beam, and I've added it to the beam_utils PiPy package. Specifically, you can use it as follows:

  1. Install beam utils: pip install beam_utils
  2. Import: from beam_utils.sources import CsvFileSource.
  3. Use it as a source: beam.io.Read(CsvFileSource(input_file)).

In its default behavior, the CsvFileSource returns dictionaries indexed by header - but you can take a look at the documentation to decide what option you'd like to use.

As an extra, if you want to implement your own custom CsvFileSource, you need to subclass Beam's FileBasedSource:

import csv
class CsvFileSource(beam.io.filebasedsource.FileBasedSource):
  def read_records(self, file_name, range_tracker):
    self._file = self.open_file(file_name)
    reader = csv.reader(self._file)
    for i, rec in enumerate(reader):
      yield res

And you can expand this logic to parse for headers and other special behavior.

Also, as a note, this source can not be split because it needs to be sequentially parsed, so it may represent a bottleneck when processing data (though that may be okay).

于 2017-02-22T21:28:20.887 回答