2

我是 Dask 的新手,如果这个问题对你来说很愚蠢,请原谅我。在 Dask 中,我正在使用大约 50GB 数据的 Dask 数据框。该数据是字符串数据,我需要在将其提供给机器学习算法(使用线程快速)之前对其进行预处理(使用过程快速)。现在的问题是,当我针对进程设计集群时,数据帧操作很快,但在线程方面却很慢(但线程在机器学习方面很快)。因此,我正在寻找一种可以从进程切换到线程环境的解决方案。

目前,我正在使用进程集群保存预处理数据,然后将其关闭并启动一个具有线程环境的新集群以应用机器学习。

有没有办法解决这个问题?

请在这方面帮助我。

4

3 回答 3

0

可以从命令行启动异构工作者:

# this runs in one shell process
dask-scheduler --scheduler-file scheduler.json

# this runs in another shell process/window
dask-worker --scheduler-file scheduler.json --name multi_proc --nprocs 5 --nthreads 1

# this runs in yet another shell process/window
dask-worker --scheduler-file scheduler.json --name multi_thread --nprocs 1 --nthreads 5

然后在您的脚本/笔记本中,您将连接到调度程序,client = Client('scheduler.json')并在提交时指定任务的适当工作人员的名称,例如

# submit for completion only by the multi_thread worker
results_multi_thread = [client.submit(process_multi_thread, task, workers='multi_thread') for task in task_list]

# submit for completion only by the multi_process worker
results_multi_proc = [client.submit(process_multi_proc, task, workers='multi_proc') for task in task_list]

对于多个工作人员,您必须指定唯一的名称(例如multi_proc_1multi_proc_2等),但正如您所见,这是一个相当复杂的过程,所以除非您的案例的具体情况是所有事情都必须一次性发生,否则我会坚持使用您正在使用的解决方案(两个单独的集群),因为它更容易编码/维护,并且希望在某些时候会支持异构工作人员。

于 2021-02-17T07:51:11.263 回答
0

您可以采取多种方法,最好的办法是衡量每种方法。您将比我们所能猜测的更准确地表示您的实际工作流程!

您可以将抽象资源分配给您的工作人员,这样您就可以拥有流程绑定阶段所需的尽可能多的工作人员,但每个工作人员实际上都有许多线程。使用资源标记,您可以让每个工作人员只运行一个线程。LocalCluster 还允许您扩大/缩小工作人员的数量,因此您可以在进入第 2 阶段时通过缩小来“切换”模式(集群中保存的所有数据都将复制到幸存的工作人员)。

我认为人们很少遇到这种“仅进程”/“仅线程”模式。通常,线程和进程的混合足以很好地完成大多数工作负载。

于 2021-02-18T16:34:57.610 回答
0

在我看来,预处理部分和机器学习部分可以分开开发。在您的情况下,您可以先从 dask 保存预处理数据,然后启动另一个机器学习系统进行训练(例如,tensorflow)。

为了存储中间数据,传统的方法是使用 HDFS 或 S3。但是如果你担心外部存储的 I/O 成本,你可以尝试像 v6d.io 这样的分布式内存存储引擎,希望示例(https://v6d.io/examples/distributed-learning.html)有帮助。

于 2021-09-16T13:14:09.650 回答