1

我正在尝试并行化我的一个匹配函数,它在一开始就起作用。很高兴看到我的 72 核 ec2 实例正在杀死它,大约一分钟左右它回到单核并且每秒迭代开始下降。

import concurrent.futures as cf

results = pd.DataFrame()

with cf.ProcessPoolExecutor() as executor:
    for res in tqdm(executor.map(matcher_helper, list(range(len(df))))):
        results = pd.concat([results, res], axis=0)

一开始我看到这个

多核处理

然后到这个

单核掉线

大约一分钟,处理比单核还不错。在多处理时,它每秒迭代大约250 次,然后下降到每秒 35 次

非常感谢任何指导。

编辑 - 附加信息 - 我原来的功能:

def matcher(data,
            data_radial_matrice,
            data_indice,
            comparison_data,
            comparison_radial_matrice,
            distance_threshold=.1):
    

    import pandas as pd
    from sklearn.metrics.pairwise import haversine_distances
    from fuzzywuzzy import fuzz
    import numpy as np

    lvl1 = haversine_distances(data_radial_matrice[data_indice].reshape(-1, 2),
                               comparison_radial_matrice) * 3959
    lvl2 = pd.Series(lvl1[lvl1 < distance_threshold])

    lvl1 = pd.DataFrame(np.argwhere(lvl1 < distance_threshold)).iloc[:, 1]

    lvl3 = pd.concat((lvl1, lvl2), axis=1)
    lvl3.columns = ['neigh_index', 'distance']
    lvl3.set_index('neigh_index', inplace=True)
    lvl3 = lvl3.merge(comparison_data,
                      left_index=True,
                      right_index=True,
                      how='inner')

    lvl4 = lvl3.loc[:, 'match_text'].apply(
        lambda x: fuzz.token_set_ratio(data.loc[data_indice, 'match_text'], x))
    lvl5 = np.where(lvl4 == np.max(lvl4))
    interim_result = lvl3.iloc[lvl5]
    interim_result['match_score'] = np.max(lvl4)
    interim_result['adp_indice'] = data_indice

    return interim_result
4

1 回答 1

0

主要的性能瓶颈是由pandas.concat我将结果收集部分更改为np.concatenate解决问题的过程引起的。在 pandas 后端,经过一定的 IO 阈值后,这会减慢整个进程并杀死多核处理。

我对代码做了些微改动,最后我返回了 numpy 数组。

def matcher2(data,
        data_radial_matrice,
        data_indice,
        comparison_data,
        comparison_radial_matrice,
        distance_threshold=.1):
'''  Haversine Distance between selected data point and comparison data points are calculated in miles
    by default is limited to .1 mile distance and among this filtered resuls matching is done and max score records are returned
'''

import pandas as pd
from sklearn.metrics.pairwise import haversine_distances
from fuzzywuzzy import fuzz
import numpy as np

lvl1 = haversine_distances(data_radial_matrice[data_indice].reshape(-1, 2),
                           comparison_radial_matrice) * 3959
lvl2 = pd.Series(lvl1[lvl1 < distance_threshold])

lvl1 = pd.DataFrame(np.argwhere(lvl1 < distance_threshold)).iloc[:, 1]

lvl3 = pd.concat((lvl1, lvl2), axis=1)
lvl3.columns = ['neigh_index', 'distance']
lvl3.set_index('neigh_index', inplace=True)
lvl3 = lvl3.merge(comparison_data,
                  left_index=True,
                  right_index=True,
                  how='inner')

lvl4 = lvl3.loc[:, 'match_text'].apply(
    lambda x: fuzz.token_set_ratio(data.loc[data_indice, 'match_text'], x))
lvl5 = np.where(lvl4 == np.max(lvl4))
interim_result = lvl3.iloc[lvl5]
interim_result['match_score'] = np.max(lvl4)
interim_result['adp_indice'] = data_indice

return np.array(interim_result)

最后,当我解析结果时。

def dnb_matcher_helper(indice):
    return matcher2(adp, adp_rad, indice, dnb, dnb_rad)

import concurrent.futures as cf

dnb_results = np.empty(shape=(1,35))

with cf.ProcessPoolExecutor() as executor:
    for res in tqdm(executor.map(dnb_matcher_helper, 
list(range(len(adp))))):
    if len(res) == 0:
        continue
    else:
        for line in res:
            line = line.reshape((1,35))
            dnb_results = np.concatenate((dnb_results, line), axis=0)
于 2020-10-14T16:08:01.330 回答