4

我有一个大的 gziped csv 文件 (.csv.gz) 上传到一个大小约为 14GB 的数据集,未压缩时为 40GB。有没有办法使用 Python 转换将其解压缩、读取和写入数据集,而不会导致执行程序 OOM?

4

1 回答 1

4

在回答这个问题时,我将协调一些策略。

首先,我想使用这里讨论的方法使用测试驱动开发来编写这个,因为我们正在处理原始文件。使用完整检查 + 构建对原始文件的迭代速度太长了,所以我将首先创建一个示例.csv文件并压缩它以加快开发速度。

我的示例.csv文件如下所示:

样本

然后我使用命令行实用程序对其进行压缩,并将其添加到我的代码存储库中,方法是将存储库克隆到我的本地机器,将文件添加到我的开发分支,并将结果推送回我的 Foundry 实例。

我还在test我的存储库中创建了一个目录,因为我想确保我的解析逻辑得到正确验证。

这导致我的存储库如下所示:

repo_with_test

Protip:不要忘记修改您的setup.pybuild.gradle文件以启用测试并专门打包您的小测试文件。

我还需要让我的解析逻辑位于我的my_compute_function方法之外,以便它可用于我的测试方法,parse_gzip.py如下所示:

from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many


def read_files(spark_session, paths):
    parsed_dfs = []
    for file_name in paths:
        parsed_df = spark_session.read.option("header", "true").csv(file_name)
        parsed_dfs += [parsed_df]
    output_df = union_many(*parsed_dfs)
    return output_df


@transform(
    the_output=Output("ri.foundry.main.dataset.my-awesome-output"),
    the_input=Input("ri.foundry.main.dataset.my-awesome-input"),
)
def my_compute_function(the_input, the_output, ctx):
    session = ctx.spark_session
    input_filesystem = the_input.filesystem()
    hadoop_path = input_filesystem.hadoop_path
    files = input_filesystem.ls('**/*.csv.gz').map(lambda file_name: hadoop_path + file_name)
    output_df = read_files(session, files)
    the_output.write_dataframe(output_df)

因此,我的test_gzip_csv.py文件如下所示:

from myproject.datasets import parse_gzip
from pkg_resources import resource_filename


def test_compressed_csv(spark_session):
    file_path = resource_filename(__name__, "test.csv.gz")
    parsed_df = parse_gzip.read_files(spark_session, [file_path])
    assert parsed_df.count() == 1
    assert set(parsed_df.columns) == {"col_1", "col_2"}

重要的是在这里看到这种方法不使用.files()对文件系统的调用,它使用该.ls()方法来获取文件名的迭代器。在这种情况下这是故意这样做的,因为我们不需要在 executors 中解析文件本身;我们只需要使用 Spark 的本地方法.csv来使用现有功能解析压缩文件。

GZip 文件实际上是可拆分的,Spark 自己的读取这些文件的方法将是最佳的,而不是编写自己的解压缩器/.csv 解析器。如果您要尝试解压缩并手动解析它们,您将冒着 OOMing 工作的风险,并且需要投入更多内存以使其成功。在您也在操作的规模上,建议不要在 Python 中处理单个文件,因为它的性能与 Spark 的性能不匹配。

请注意,我还使用transforms.verbs.dataframes.union_many此处的方法来优雅地处理具有不同架构的不同文件。您可以指定“窄”、“宽”和“严格”选项来处理不同模式的情况,请参阅最适合您需求的产品文档。

于 2021-08-31T11:14:33.940 回答