我有一个有 72 个内核的工作站(实际上是 36 个多线程 CPU,显示为 72 个内核multiprocessing.cpu_count()
)。
我尝试了multiprocessing
并发ray
处理,批量数百万个小文件,我想在该处理期间同时编写一些输出文件。
我对阻止.get()
与例如apply_async()
(in multiprocessing
) 和ray.get()
.
有了ray
,我有一个远程函数 ( process_group()
),它在循环中并行处理数据组。在下文中,使用该multiprocessing
模块的代码版本也作为注释给出。
import ray
import pandas as pd
# from multiprocessing import Pool
ray.init(num_cpus=60)
# with Pool(processes=n_workers) as pool:
for data_list in many_data_lists:
##-----------------------
## With ray :
df_list = ray.get([process_group.remote(data) for data in data_list])
##-----------------------
## With multiprocessing :
#f_list = pool.map(process_group, list_of_indices_into_data_list)
##
## data are both known from the parent process
## and I use copy-on-write semantic to avoid having 60 copies.
## All the function needs are a list of indices
## of where to fetch slices of the read-only data.
##
very_big_df = pd.concatenate(df_list)
##-----------------------
## Write to file :
very_big_df.to_parquet(outputfile)
因此,在每次循环迭代中,我必须收集process_group()
同时计算的 many 的输出,作为数据帧列表,df_list
以便连接成一个更大的very_big_df
数据帧。后者需要写入磁盘(通常大小为 ~1 到 ~3 GB)。编写一个这样的文件需要大约10-30 [s]
一段时间才能180 [s]
处理process_group
遥控器。有数千次循环迭代。所以这需要几天时间才能完成。
是否可以将文件以非阻塞方式写入磁盘,同时循环继续以节省大约 10% 的时间(这将节省大约一天的计算时间)?
到下一次循环迭代的并发进程完成时,有足够的时间来写入前一次迭代的输出。这里涉及的内核似乎都以接近 100% 的速度运行,因此Threading
可能也不推荐使用该模块。multiprocessing.apply_async()
更令人沮丧,因为它不想要我的不可选择的输出very_big_df
数据框,我必须与一些更复杂的东西共享,这可能会花费我试图节省的时间,我希望ray
能有效地处理类似的事情。
[更新] 为了简单起见,我没有提到所有进程之间有一个很大的共享变量(这就是为什么我称它为并行进程以及文件的并发写入)。结果我的标题问题被编辑了。所以实际上,在光线并行作业之前有这段代码:
shared_array_id = ray.put(shared_array)
df_list = ray.get([process_group.remote(shared_array, data) for data in data_list])
不确定这是否使它更像是“并行”执行而不仅仅是并发操作。
[更新 2] 共享数组是一个查找表,即对于并行工作者而言是只读的。
[更新 3] 我尝试了两种建议的解决方案:Threading 和 Ray / compute() 对于后者,建议使用写函数作为远程,并在 for 循环中异步发送写操作,我最初认为这是唯一可能的通过 .get() 这将是阻塞。
因此,对于 Ray,这显示了两种解决方案:
@ray.remote
def write_to_parquet(df_list, filename):
df = pd.concat(df_list)
df.to_parquet(filename, engine='pyarrow', compression=None)
# Share array created outside the loop, read-only (big lookup table).
# About 600 MB
shared_array_id = ray.put(shared_array)
for data_list in many_data_lists:
new_df_list = ray.get([process_group.remote(shared_array_id, data) for data in data_list])
write_to_parquet.remote(df_list, my_filename)
## Using threading, one would remove the ray decorator:
# write_thread = threading.Thread(target=write_to_parquet, args=(new_df_list, tinterval.left))
# write_thread.start()
对于 RAY 解决方案,这需要增加 object_store_memory,默认值是不够的:节点内存的 10% ~ 37 GB(我有 376 GB 的 ram),然后上限为 20 GB,唯一存储的对象总计约 22 GB:一个数据帧列表df_list
(大约 11 GB),以及它们在写入函数中连接的结果(大约 11 GB),假设在连接期间有一个副本。如果不是,那么这个内存问题没有意义,我想知道我是否可以传递 numpy 视图,我认为这是默认情况下发生的。这是 RAY 相当令人沮丧的方面,因为我无法真正预测每个内存将有多少df_list
,它可以从 1 倍到 3 倍不等......
最后,坚持multiprocessing
使用 Threading 是最有效的解决方案,因为处理部分(没有 I/O)更快:
from multiprocessing import Pool
# Create the shared array in the parent process & exploit copy-on-write (fork) semantics
shared_array = create_lookup_table(my_inputs)
def process_group(my_data):
# Process a new dataframe here using my_data and some other data inside shared_array
...
return my_df
n_workers = 60
with Pool(processes=n_workers) as pool:
for data_list in many_data_lists:
# data_list contains thousands of elements. I choose a chunksize of 10
df_list = pool.map(process_group, data_list, 10)
write_thread = threading.Thread(target=write_to_parquet, args=(group_df_list, tinterval.left))
write_thread.start()
在每次循环迭代中,通常len(many_data_lists) = 7000
每个列表都包含 7 个大小为 (3, 9092) 的 numpy 数组。因此,这 7000 个列表将发送给 60 个工作人员:
process_group
每次循环迭代的所有并行时间:
射线:250 [s]
多处理:233 [s]
I/O:将 5GB parquet 文件写入外部 USB 3 旋转磁盘大约需要 35 秒。内部旋转盘上大约 10 秒。
Ray:约 5 秒的开销用于创建write_to_parquet.remote()
阻塞循环的未来。这仍然是在旋转磁盘上写入所需时间的 50%。这并不理想。
多处理:测量的 0 秒开销。
总上墙时间:
雷:486 [s]
多处理:436 [s]
我重复了几次,Ray和Multiprocessing之间的差异始终显示Multiprocessing快了约 50 秒。这是一个显着的差异,也令人费解,因为Ray宣传了更高的效率。
我将运行它进行更长时间的迭代并报告稳定性(内存、垃圾收集的潜在问题......)