0

我使用下面的代码片段将 CSV 文件作为字典读入管道。

class MyCsvFileSource(beam.io.filebasedsource.FileBasedSource):
    def read_records(self, file_name, range_tracker):
        self._file = self.open_file(file_name)

        reader = csv.DictReader(self._file, dialect=MyCustomDialect)

        for rec in reader:
            yield rec

这个片段几乎是从How to convert csv into a dictionary in apache beam dataflow (Pablo 的回答)中的一篇文章中复制过来的。

后来我注意到这一切都适用于相对较小的文件(例如 35k 行)。但是对于更大的文件,例如。70 万行,我看到在输出 (BigQuery) 中生成了重复项。几乎是 5 倍,所以我最终得到了超过 3M 行。

我仔细查看beam.io.filebasedsource.FileBasedSource并看到默认splitted设置为的参数。True

文档是这样说的:

splittable (bool): whether :class:`FileBasedSource` should try to
logically split a single file into data ranges so that different parts
of the same file can be read in parallel. If set to :data:`False`,
:class:`FileBasedSource` will prevent both initial and dynamic splitting
of sources for single files. File patterns that represent multiple files
may still get split into sources for individual files. Even if set to
:data:`True` by the user, :class:`FileBasedSource` may choose to not
split the file, for example, for compressed files where currently it is
not possible to efficiently read a data range without decompressing the
whole file.

当参数设置为时,True它能够并行读取源文件。

我注意到,如果我将此参数设置为False,则文件读取正常并且我没有重复。

目前我将此splittable参数设置为,False因为它可以防止重复,但我不确定当我的文件将成行增长时,这是否是未来的证据。

并行读取源文件是否可能存在一些问题?有什么我忽略或没有以正确的方式处理的事情吗?

4

1 回答 1

0

为了支持不重复的拆分,您必须在从源读取时使用传递的“range_tracker”对象。例如,在声明您正在阅读的文件的唯一位置时,您必须调用 try_claim()。

请参阅以下内容以获取更多信息。 https://beam.apache.org/documentation/sdks/python-custom-io/

于 2019-01-08T19:36:38.617 回答