我试图并行化以下代码:
def extract_features(dict_w, key, d_f):
lowess = sm.nonparametric.lowess
f_v = []
for c in np.setdiff1d(dict_w[key].columns, ["t", "ts"]):
dict_w[key][c + "_lowess"] = lowess(
dict_w[key][c],
dict_w[key].iloc[:, ~dict_w[key].columns.duplicated()]["ts"],
is_sorted=True,
frac=0.05,
)[:, 1]
f_v.extend(np.zeros(1,5))
d_f[key] = np.array(f_v)
我尝试了以下方法:
def extract_features(key, dict_w, shared_dic):
lowess = sm.nonparametric.lowess
f_v = []
for c in np.setdiff1d(dict_w[key].columns, ["t", "ts"]):
lw = lowess(
dict_w[key][c],
dict_w[key].iloc[:, ~dict_w[key].columns.duplicated()]["ts"],
is_sorted=True,
frac=0.05,)[:, 1]
#print(lw)
dict_w[key][c + "_lowess"] = lw
#print("test ")
print(lw)
shared_dic[key] = np.array(f_v)
print(len(shared_dic[key]))
def parallelize(dict_w, max_workers, shared_dic):
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as pool:
start = time.perf_counter()
for key in dict_w.keys():
pool.submit(extract_features, key, dict_w, shared_dic)
logger.info("Execution time (parallel) = {}".format(time.perf_counter() - start))
问题是,如果我输出第一个 print(lw),我可以看到 lowess 函数的结果,但是如果我dict_w[key][c + "_lowess"] = lw在终端中没有任何输出之后放置 print(lw)。两者dict_w和shared_dic定义如下:
manager = Manager()
dict_w = manager.dict()
shared_dic = manager.dict()