0

我正在使用带有两个工作人员和一个调度程序的 vanilla Dask-Kubernetes 设置来迭代一些 JSON 文件的行(并应用一些为简单起见未出现在此处的函数)。我只看到一个工人在工作,而我希望看到其中两个。

希望重新分区能帮助我尝试了不同的值,bag.repartition(num)这些值返回不同的行数,但它们并没有改变关于工人不平衡和仅集中在一个工人身上的内存消耗的任何事情。

我想我不了解分区和工作人员之间的相关性,而且我在 Dask 文档中找不到任何关于它的内容。非常欢迎任何帮助或指点!

import dask.bag as db

def grep_buildings():
    base = "https://usbuildingdata.blob.core.windows.net/usbuildings-v1-1/"
    b = db.text.read_text(f"{base}/Alabama.zip")
    # b = b.repartition(2)
    lines = b.take(3_000_000)
    return lines

len(grep_buildings())
4

1 回答 1

0

在您的示例中,您正在打开文件,并且它已被压缩

db.text.read_text(f"{base}/Alabama.zip")

Dask 能够并行打开和处理多个文件,每个文件至少有一个分区。Dask 还能够将单个文件拆分为块(blocksize参数);但这仅适用于未压缩的数据。原因是,对于整个文件压缩方法,到达未压缩流中某个点的唯一方法是从头开始读取,因此每个分区都会读取大部分数据。

最后,当您从单个分区开始时,重新分区对您没有帮助:您需要在将数据拆分为下游任务之前读取整个文件。

于 2020-09-24T19:59:13.573 回答