5

我正在使用我在 Python 中使用字典和计数器构建的稀疏张量数组操作。我想让并行使用这个数组操作成为可能。底线是我最终在每个节点上都有计数器,我想使用 MPI.Allreduce (或另一个不错的解决方案)将它们加在一起。例如,使用 Counters 可以做到这一点

A = Counter({a:1, b:2, c:3})
B = Counter({b:1, c:2, d:3})

这样

C = A+B = Counter({a:1, b:3, c:5, d:3}).

我想做同样的操作,但要使用所有相关节点,

MPI.Allreduce(send_counter, recv_counter, MPI.SUM)

但是,MPI 似乎无法识别字典/计数器上的此操作,从而引发错误expecting a buffer or a list/tuple。我最好的选择是“用户定义的操作”,还是有办法让 Allreduce 添加计数器?谢谢,

编辑(2015 年 7 月 14 日):我试图为字典创建用户操作,但存在一些差异。我写了以下

def dict_sum(dict1, dict2, datatype):
    for key in dict2:
        try:
            dict1[key] += dict2[key]
        except KeyError:
            dict1[key] = dict2[key]

当我告诉 MPI 关于这个功能时,我这样做了:

dictSumOp = MPI.Op.Create(dict_sum, commute=True)

在代码中我用它作为

the_result = comm.allreduce(mydict, dictSumOp)

然而,它扔了unsupported operand '+' for type dict。所以我写了

the_result = comm.allreduce(mydict, op=dictSumOp)

现在它抛出dict1[key] += dict2[key] TypeError: 'NoneType' object has no attribute '__getitem__' 了很明显它想知道那些东西是字典吗?我怎么知道他们确实有类型字典?

4

1 回答 1

7

MPI 和 MPI4py 都不知道特别是 Counters 的任何内容,因此您需要创建自己的归约操作才能使其工作;这对于任何其他类型的 python 对象都是一样的:

#!/usr/bin/env python
from mpi4py import MPI
import collections

def addCounter(counter1, counter2, datatype):
    for item in counter2:
        counter1[item] += counter2[item]
    return counter1

if __name__=="__main__":

    comm = MPI.COMM_WORLD

    if comm.rank == 0:
        myCounter = collections.Counter({'a':1, 'b':2, 'c':3})
    else:
        myCounter = collections.Counter({'b':1, 'c':2, 'd':3})


    counterSumOp = MPI.Op.Create(addCounter, commute=True)

    totcounter = comm.allreduce(myCounter, op=counterSumOp)
    print comm.rank, totcounter

在这里,我们采用了一个函数,它将两个计数器对象相加,并使用 MPI.Op.Create 从它们中创建了一个 MPI 运算符;mpi4py 将 unpickle 对象,运行此函数以成对组合这些项目,然后 pickle 部分结果并将其发送到下一个任务。

还要注意,我们使用的是(小写)allreduce,它适用于任意 python 对象,而不是(大写)Allreduce,它适用于 numpy 数组或其道德等价物(缓冲区,映射到 MPI API 的 Fortran/C 数组设计上)。

运行给出:

$ mpirun -np 2 python ./counter_reduce.py 
0 Counter({'c': 5, 'b': 3, 'd': 3, 'a': 1})
1 Counter({'c': 5, 'b': 3, 'd': 3, 'a': 1})

$ mpirun -np 4 python ./counter_reduce.py 
0 Counter({'c': 9, 'd': 9, 'b': 5, 'a': 1})
2 Counter({'c': 9, 'd': 9, 'b': 5, 'a': 1})
1 Counter({'c': 9, 'd': 9, 'b': 5, 'a': 1})
3 Counter({'c': 9, 'd': 9, 'b': 5, 'a': 1})

并且只需进行适度的更改即可使用通用字典:

#!/usr/bin/env python
from mpi4py import MPI

def addCounter(counter1, counter2, datatype):
    for item in counter2:
        if item in counter1:
            counter1[item] += counter2[item]
        else:
            counter1[item] = counter2[item]
    return counter1

if __name__=="__main__":

    comm = MPI.COMM_WORLD

    if comm.rank == 0:
        myDict = {'a':1, 'c':"Hello "}
    else:
        myDict = {'c':"World!", 'd':3}

    counterSumOp = MPI.Op.Create(addCounter, commute=True)

    totDict = comm.allreduce(myDict, op=counterSumOp)
    print comm.rank, totDict

跑步给予

$ mpirun -np 2 python dict_reduce.py 
0 {'a': 1, 'c': 'Hello World!', 'd': 3}
1 {'a': 1, 'c': 'Hello World!', 'd': 3}
于 2015-07-13T16:54:22.103 回答