下面,我收集了 4 种方法来完成涉及排序更新 Pandas Dataframes 的代码执行。
我想应用最好的方法来加速代码执行。我是否使用了可用的最佳实践?
有人可以就以下想法分享一些想法吗?
我正在遍历数据框,因为解决我的问题的过程似乎需要它。使用 Dask Dataframes 会大大提高速度吗?
Dask 分布式版本能否从为每个工作人员设置特定数量的工作人员、进程、线程中受益?人们指出,在某些情况下,增加进程数而不是线程数(反之亦然)是最好的。
用于此类代码的最强大的硬件基础设施是什么?多处理版本在具有更多物理 CPU 内核的 AWS 实例上速度更快。
- 使用 Dask Distributed 的 Kubernetes/AWS 设置会快得多吗?
- 这是否可以在本地 GPU 或多 GPU AWS 实例的帮助下轻松调整以运行?
以下是完成时间供参考:
- 常规“For”循环:
34 seconds
- 黄昏延迟:
21 seconds
- Dask 分布式(本地机器):
21 seconds
- 多处理:
10 seconds
from dask.distributed import Client
from multiprocessing import Pool
from dask import delayed
import pandas as pd
import numpy as np
client = Client()
import random
import dask
#Setting original input data that will be used in the functions
alist=['A','B','C','D','E','F','G','H','I']
set_table=pd.DataFrame({"A":alist,
"B":[i for i in range(1,10)],
"C":[i for i in range(11,20)],
"D":[0]*9})
#Assembled random list of combinations
criteria_list=[]
for i in range(0,10000):
criteria_list.append(random.sample(alist,6))
#Sorts and filters the original df
def one_filter_sorter(criteria):
sorted_table=set_table[set_table['A'].isin(criteria)]
sorted_table=sorted_table.sort_values(['B','C'],ascending=True)
return sorted_table
#Exists to help the function below. Simplified for this example
def helper_function(sorted_table,idx):
if alist.index(sorted_table.loc[idx,'A'])>5:
return True
#last function that retuns the gathered result
def two_go_downrows(sorted_table):
for idx, row in sorted_table.iterrows():
if helper_function(sorted_table,idx)==True:
sorted_table.loc[idx,'D'] = 100 - sorted_table.loc[idx,'C']
res=sorted_table.loc[:,['A','D']].to_dict()
return res
#--Loop version
result=[]
for criteria in criteria_list:
A=one_filter_sorter(criteria)
B=two_go_downrows(A)
result.append(B)
#--Multiprocessed version
result=[]
if __name__ == '__main__':
pool=Pool(processes=6)
A=pool.map(one_filter_sorter, criteria)
B=pool.map(two_go_downrows, A)
result.append(B)
#--Delayed version
result=[]
for criteria in criteria_list:
A=delayed(one_filter_sorter)(criteria)
B=delayed(two_go_downrows)(A)
result.append(B)
dask.compute(result)
#--Distributed version
A= client.map(one_filter_sorter,criteria_list)
B= client.map(two_go_downrows,A)
client.gather(B)
谢谢