0

我想在 python 中并行构建一个完全连接的图,并获得一个边缘值列表,例如:以字典格式存储:
( node1, node2 ) = edge_value

{ ( node1, node2 ) : edge_value [, ... [, ... ] ] }

为此,我必须首先初始化两个global变量,G用于图形和f_correlation所述字典

import networkx as nx
from multiprocessing import Pool
G = nx.Graph()
f_correlation = {}

然后创建一个函数来构造图形并将其存储到
字典中:( node1, node2 ) = edge_valuef_correlation

def construct_graph_parallelly(pair_with_df):
    global G
    global f_correlation
    pair, df = pair_with_df
    i, j = pair
    # calculate the edge value and store it in the global variable f_correlation
    f_correlation[ (i, j) ] = calculate_value(df, i, j)    # this function calculate some value on the dataframe
    # here i, j are node in the graph 
    G.add_edge(i, j, weight = f_correlation[ (i, j) ])
    return f_correlation

然后multiprocessing.Pool()创建一个-instance并调用它的.map()-method,让代码同时执行:

def make_all_pair_with_df(node_list, df):
    all_pair_with_df = []
    for i in node_list:
        for j in node_list:
            if i != j :
                pair_with_df = (i,j),df
                all_pair_with_df.append(pair_with_df)

    return all_pair_with_df

node_list = ['a', 'b', 'c', 'd', 'e']
pool = Pool()
all_pair_with_df = make_all_pair_with_df(node_list, df) 
f_correlation = pool.map(construct_graph_parallelly, all_pair_with_df)
pool.close()
print("DONE")

但是当我运行它无限运行的代码时,从不打印“DONE”

问题之一可能是global-variable 问题,在Globals variables and Python multiprocessing中讨论

但是对于我的工作,我需要全局更新字典和Connected Graph 。

我该怎么做或者我应该做哪些修改才能使它工作?

4

1 回答 1

0

更新:让我们不要那么雄心勃勃,使用多处理来构建f_correlation字典。

使用您当前的代码,每个进程都有自己的全局变量副本。您应该使用可共享托管类型(请参阅参考资料multiprocessing.SyncManager)。例如:

from multiprocessing import Pool, Manager

# initialize this process's global variables:
def pool_initializer(the_dict):
    # initialize global variable with shared, managed dictionary
    global f_correlation
    f_correlation = the_dict

def construct_graph_parallelly(pair_with_df):
    global f_correlation
    pair, df = pair_with_df
    i, j = pair
    # calculate the edge value and store it in the global variable f_correlation
    f_correlation[(i, j)] = calculate_value(df, i, j)    # this function calculate some value on the dataframe

def main():    
    with Manager() as manager: # create SyncManager instance
        f_correlation = manager.dict() # create managed, shared dictionary
        # code to initialize G omitted
        with Pool(initializer=pool_initializer, initargs=(f_correlation,)) as pool:
            all_pair_with_df = make_all_pair_with_df(node_list, df) 
            pool.map(construct_graph_parallelly, all_pair_with_df)
            # now build graph
            G = nx.Graph()
            for k, v in f_correlation.items():
               i, j = k # unpack
               G.add_edge(i, j, weight=v)
    
if __name__ == '__main__': # required for Windows
    main()
于 2020-11-25T13:09:29.063 回答