2

下面,我收集了 4 种方法来完成涉及排序更新 Pandas Dataframes 的代码执行。

我想应用最好的方法来加速代码执行。我是否使用了可用的最佳实践?


有人可以就以下想法分享一些想法吗?

  1. 我正在遍历数据框,因为解决我的问题的过程似乎需要它。使用 Dask Dataframes 会大大提高速度吗?

  2. Dask 分布式版本能否从为每个工作人员设置特定数量的工作人员、进程、线程中受益?人们指出,在某些情况下,增加进程数而不是线程数(反之亦然)是最好的。

  3. 用于此类代码的最强大的硬件基础设施是什么?多处理版本在具有更多物理 CPU 内核的 AWS 实例上速度更快。

    • 使用 Dask Distributed 的 Kubernetes/AWS 设置会快得多吗?
    • 这是否可以在本地 GPU 或多 GPU AWS 实例的帮助下轻松调整以运行?

以下是完成时间供参考:

  • 常规“For”循环:34 seconds
  • 黄昏延迟:21 seconds
  • Dask 分布式(本地机器):21 seconds
  • 多处理:10 seconds

from dask.distributed import Client
from multiprocessing import Pool
from dask import delayed
import pandas as pd
import numpy as np
client = Client()
import random
import dask

#Setting original input data that will be used in the functions
alist=['A','B','C','D','E','F','G','H','I']
set_table=pd.DataFrame({"A":alist,
                        "B":[i for i in range(1,10)],
                        "C":[i for i in range(11,20)],
                        "D":[0]*9})

#Assembled random list of combinations   
criteria_list=[]
for i in range(0,10000):
    criteria_list.append(random.sample(alist,6))

#Sorts and filters the original df
def one_filter_sorter(criteria):
    sorted_table=set_table[set_table['A'].isin(criteria)]
    sorted_table=sorted_table.sort_values(['B','C'],ascending=True)
    return sorted_table

#Exists to help the function below. Simplified for this example    
def helper_function(sorted_table,idx):
    if alist.index(sorted_table.loc[idx,'A'])>5:
        return True

#last function that retuns the gathered result    
def two_go_downrows(sorted_table):

    for idx, row in sorted_table.iterrows():
        if helper_function(sorted_table,idx)==True:
            sorted_table.loc[idx,'D'] = 100 - sorted_table.loc[idx,'C']

    res=sorted_table.loc[:,['A','D']].to_dict()
    return res

#--Loop version
result=[]    
for criteria in criteria_list:
    A=one_filter_sorter(criteria)
    B=two_go_downrows(A)
    result.append(B)

#--Multiprocessed version
result=[]    
if __name__ == '__main__':
    pool=Pool(processes=6)
    A=pool.map(one_filter_sorter, criteria)
    B=pool.map(two_go_downrows, A) 
    result.append(B)

#--Delayed version
result=[]    
for criteria in criteria_list:
    A=delayed(one_filter_sorter)(criteria) 
    B=delayed(two_go_downrows)(A) 
    result.append(B)
dask.compute(result)

#--Distributed version
A= client.map(one_filter_sorter,criteria_list) 
B= client.map(two_go_downrows,A)
client.gather(B)    

谢谢

4

0 回答 0