我有一个动态的 Dask Kubernetes 集群。我想将 35 个 parquet 文件(约 1.2GB)从 Gcloud 存储加载到 Dask Dataframe 中,然后apply()在将结果保存到 parquet 文件到 Gcloud 之后对其进行处理。
在从 Gcloud 存储加载文件期间,集群内存使用量增加到大约 3-4GB。然后工作人员(每个工作人员有 2GB 的 RAM)被终止/重新启动并且一些任务丢失,因此集群开始循环计算相同的事情。我删除了apply()操作并离开只是read_parquet()为了测试我的自定义代码是否会导致问题,但问题是相同的,即使只有一次read_parquet()操作。这是一个代码:
client = Client('<ip>:8786')
client.restart()
def command():
client = get_client()
df = dd.read_parquet('gcs://<bucket>/files/name_*.parquet', storage_options={'token':'cloud'}, engine='fastparquet')
df = df.compute()
x = client.submit(command)
x.result()
注意:我提交了一个命令函数来运行所有必要的命令,以避免集群内的 gcsfs 身份验证问题
经过一番调查,我了解到问题可能在于.compute()将所有数据返回到一个进程,但这个进程(我的命令函数)正在一个工作人员上运行。因此,工作人员没有足够的 RAM,崩溃并丢失所有触发任务重新运行的计算任务。
我的目标是:
- 从镶木地板文件中读取
- 执行一些计算
apply() - 甚至无需从集群返回数据,将其以 parquet 格式写回 Gcloud 存储。
所以,我只想将数据保存在集群上,而不是将其返回。只需在其他地方计算和保存数据。
在阅读了 Dask 分布式文档后,我找到了client.persist()/compute()方法.scatter()。它们看起来像我需要的东西,但我真的不明白如何使用它们。
请您为我的示例提供帮助client.persist()和client.compute()方法或建议另一种方法吗?非常感谢!
Dask 版本:0.19.1
Dask 分布式版本:1.23.1
Python版本:3.5.1