1

这是我的第一个多处理实现,我已经按顺序执行了我的代码,我花了大约 30 秒来处理 20 条记录。但是我创建了一个字典,每个键都有一组记录,并尝试使用 pool.map 为每个键应用该函数。现在虽然我正在为每个进程分配每个核心,但现在需要超过 2 分钟的时间来处理。有人可以帮我优化这个。

def f(values):
    data1 = itertools.combinations(values,2)
    tuple_attr =('Age', 'Workclass', 'Fnlwgt', 'Education', 'Education-num', 'marital-status', 'Occupation', 'Relationship', 'Race', 'Sex', 'Capital-gain', 'Capital-loss', 'Hours-per-week', 'Native country', 'Probability', 'Id')
    new = ((tuple_attr[i] for i, t in enumerate(zip(*pair)) if t[0]!=t[1]) for pair in data1)
    skt = set(frozenset(temp) for temp in new)
    newset = set(s for s in skt if not any(p < s for p in skt))

    empty = frozenset(" ")
    tr_x = set(frozenset(i) for i in empty)
    tr = set(frozenset(i) for i in empty)
    for e in newset:
        tr.clear()
        tr = tr.union(tr_x)
        tr_x.clear()
        for x in tr:
            for a in e:
                if x == empty:
                    tmp = frozenset(frozenset([a]))
                    tr_x = tr_x.union([tmp])
                else : 
                    tmp = frozenset(frozenset([a]).union(x))
                    tr_x = tr_x.union([tmp])
        tr.clear()
        tr = tr.union(tr_x)
        tr = set(l for l in tr if not any(m < l for m in tr))

    return tr

def main():
    p = Pool(len(data)) #number of processes = number of CPUs
    keys, values= zip(*data.items()) #ordered keys and values
    processed_values= p.map( f, values )
    result= dict( zip(keys, processed_values ) ) 
    p.close() # no more tasks
    p.join()  # wrap up current tasks
    print(result)


if __name__ == '__main__':
    import csv
    dicchunk = {*****} #my dictionary
    main()
4

1 回答 1

1

我创建了一个测试程序来运行一次multiprocessing,一次不运行:

def main(data):
    p = Pool(len(data)) #number of processes = number of CPUs
    keys, values= zip(*data.items()) #ordered keys and values
    start = time.time()
    processed_values= p.map( f, values )
    result= dict( zip(keys, processed_values ) ) 
    print("multi: {}".format(time.time() - start))
    p.close() # no more tasks
    p.join()  # wrap up current tasks

    start = time.time()
    processed_values = map(f, values)
    result2 = dict( zip(keys, processed_values ) ) 
    print("non-multi: {}".format(time.time() - start))
    assert(result == result2)

这是输出:

multi: 191.249588966
non-multi: 225.774535179

multiprocessing更快,但没有你想象的那么快。原因是一些子列表比其他子列表需要更长的时间(几分钟)才能完成。您永远不会比处理最大子列表所需的时间更快。

我在工作函数中添加了一些跟踪来演示这一点。我在工人开始时节省了时间,并在最后打印出来。这是输出:

<Process(PoolWorker-4, started daemon)> is done. Took 0.940237998962 seconds
<Process(PoolWorker-2, started daemon)> is done. Took 1.28068685532 seconds
<Process(PoolWorker-1, started daemon)> is done. Took 42.9250118732 seconds
<Process(PoolWorker-3, started daemon)> is done. Took 193.635578156 seconds

正如你所看到的,工人正在做的工作量非常不一样,所以你只节省了大约 44 秒而不是连续工作。

于 2014-09-18T03:35:42.097 回答