0

我有一个动态的 Dask Kubernetes 集群。我想将 35 个 parquet 文件(约 1.2GB)从 Gcloud 存储加载到 Dask Dataframe 中,然后apply()在将结果保存到 parquet 文件到 Gcloud 之后对其进行处理。

在从 Gcloud 存储加载文件期间,集群内存使用量增加到大约 3-4GB。然后工作人员(每个工作人员有 2GB 的 RAM)被终止/重新启动并且一些任务丢失,因此集群开始循环计算相同的事情。我删除了apply()操作并离开只是read_parquet()为了测试我的自定义代码是否会导致问题,但问题是相同的,即使只有一次read_parquet()操作。这是一个代码:

client = Client('<ip>:8786')
client.restart()

def command():
    client = get_client()
    df = dd.read_parquet('gcs://<bucket>/files/name_*.parquet', storage_options={'token':'cloud'}, engine='fastparquet')
    df = df.compute()

x = client.submit(command)
x.result()

注意:我提交了一个命令函数来运行所有必要的命令,以避免集群内的 gcsfs 身份验证问题

经过一番调查,我了解到问题可能在于.compute()将所有数据返回到一个进程,但这个进程(我的命令函数)正在一个工作人员上运行。因此,工作人员没有足够的 RAM,崩溃并丢失所有触发任务重新运行的计算任务。

我的目标是:

  • 从镶木地板文件中读取
  • 执行一些计算apply()
  • 甚至无需从集群返回数据,将其以 parquet 格式写回 Gcloud 存储。

所以,我只想将数据保存在集群上,而不是将其返回。只需在其他地方计算和保存数据。

在阅读了 Dask 分布式文档后,我找到了client.persist()/compute()方法.scatter()。它们看起来像我需要的东西,但我真的不明白如何使用它们。

请您为我的示例提供帮助client.persist()client.compute()方法或建议另一种方法吗?非常感谢!

Dask 版本:0.19.1

Dask 分布式版本:1.23.1

Python版本:3.5.1

4

1 回答 1

2
df = dd.read_parquet('gcs://<bucket>/files/name_*.parquet', storage_options={'token':'cloud'}, engine='fastparquet')
df = df.compute()  # this triggers computations, but brings all of the data to one machine and creates a Pandas dataframe

df = df.persist()  # this triggers computations, but keeps all of the data in multiple pandas dataframes spread across multiple machines
于 2018-10-07T22:57:13.007 回答