我正在尝试并行化我的一个匹配函数,它在一开始就起作用。很高兴看到我的 72 核 ec2 实例正在杀死它,大约一分钟左右它回到单核并且每秒迭代开始下降。
import concurrent.futures as cf
results = pd.DataFrame()
with cf.ProcessPoolExecutor() as executor:
for res in tqdm(executor.map(matcher_helper, list(range(len(df))))):
results = pd.concat([results, res], axis=0)
一开始我看到这个
然后到这个
大约一分钟,处理比单核还不错。在多处理时,它每秒迭代大约250 次,然后下降到每秒 35 次。
非常感谢任何指导。
编辑 - 附加信息 - 我原来的功能:
def matcher(data,
data_radial_matrice,
data_indice,
comparison_data,
comparison_radial_matrice,
distance_threshold=.1):
import pandas as pd
from sklearn.metrics.pairwise import haversine_distances
from fuzzywuzzy import fuzz
import numpy as np
lvl1 = haversine_distances(data_radial_matrice[data_indice].reshape(-1, 2),
comparison_radial_matrice) * 3959
lvl2 = pd.Series(lvl1[lvl1 < distance_threshold])
lvl1 = pd.DataFrame(np.argwhere(lvl1 < distance_threshold)).iloc[:, 1]
lvl3 = pd.concat((lvl1, lvl2), axis=1)
lvl3.columns = ['neigh_index', 'distance']
lvl3.set_index('neigh_index', inplace=True)
lvl3 = lvl3.merge(comparison_data,
left_index=True,
right_index=True,
how='inner')
lvl4 = lvl3.loc[:, 'match_text'].apply(
lambda x: fuzz.token_set_ratio(data.loc[data_indice, 'match_text'], x))
lvl5 = np.where(lvl4 == np.max(lvl4))
interim_result = lvl3.iloc[lvl5]
interim_result['match_score'] = np.max(lvl4)
interim_result['adp_indice'] = data_indice
return interim_result