0

我正在尝试模拟一个脉动阵列结构——所有这些都是我从这些幻灯片中学到的:http : //web.cecs.pdx.edu/~mperkows/temp/May22/0020.Matrix-multiplication-systolic.pdf——用于 Python 环境中的矩阵乘法。脉动阵列的一个组成部分是 PE 之间的数据流与发生在任何一个节点上的任何乘法或加法并行。我很难推测如何在 Python 中准确地实现这样的并发过程。具体来说,我希望更好地理解一种计算方法,以级联方式将要相乘的矩阵元素馈送到收缩阵列中,同时允许这些元素以并发方式通过阵列传播。

我已经开始在 python 中编写一些代码到多个两个 3 x 3 数组,但最终,我想模拟任何大小的脉动数组以处理任何大小的 a 和 b 矩阵。

from threading import Thread
from collections import deque

vals_deque = deque(maxlen=9*2)#will hold the interconnections between nodes of the systolicarray
dump=deque(maxlen=9) # will be the output of the SystolicArray
prev_size = 0

def setupSystolicArray():
    global SystolicArray
    SystolicArray = [NodeSystolic(i,j) for i in range(3), for i in range(3)]

def spreadInputs(a,b):
    #needs some way to initially propagate the elements of a and b through the top and leftmost parts of the systolic array

    new = map(lambda x: x.start() , SystolicArray) #start all the nodes of the systolic array, they are waiting for an input

    #even if i found a way to put these inputs into the array at the start, I am not sure how to coordinate future inputs into the array in the cascading fashion described in the slides
    while(len(dump) != 9):
        if(len(vals_deque) != prev_size):

            vals = vals_deque[-1]
            row = vals['t'][0]
            col = vals['l'][0]
            a= vals['t'][1]
            b = vals['l'][1]
            # these if elif statements track whether the outputs are at the edge of the systolic array and can be removed
            if(row >= 3):
                dump.append(a)
            elif(col >= 3):
                dump.append(b)
            else:
                #something is wrong with the logic here
                SystolicArray[t][l-1].update(a,b)
                SystolicArray[t-1][l].update(a,b)   

class NodeSystolic:
    def __init__(self,row, col):
        self.row = row
        self.col = col
        self.currval = 0
        self.up = False
        self.ain = 0#coming in from the top
        self.bin = 0#coming in from the side

    def start(self):
        Thread(target=self.continuous, args = ()).start()

    def continuous(self):
        while True:
            if(self.up = True):
                self.currval = self.ain*self.bin
                self.up = False
                self.passon(self.ain, self.bin)
            else:
                pass

    def update(self, left, top):
        self.up = True
        self.ain = top
        self.bin = left

    def passon(self, t,l):
        #this will passon the inputs this node has received onto the next nodes
        vals_deque.append([{'t': [self.row+ 1, self.ain], 'l': [self.col + 1, self.bin]}])

    def returnValue(self):
        return self.currval


def main():
    a = np.array([
    [1,2,3],
    [4,5,6],
    [7,8,9],
    ])

    b = np.array([
    [1,2,3],
    [4,5,6],
    [7,8,9]
    ])

    setupSystolicArray()
    spreadInputs(a,b)

上面的代码无法运行,仍然有很多错误。我希望有人能给我指点如何改进代码,或者是否有更简单的方法来模拟具有 Python 中的异步属性的脉动数组的并行过程,所以对于非常大的脉动数组大小,我会的不必担心创建太多线程(节点)。

4

1 回答 1

1

考虑在 Python 中模拟一个脉动数组是很有趣的,但我认为按照您在上面勾勒出的路线进行此操作存在一些重大困难。

最重要的是,由Global Interpreter Lock引起的 Python 真正并行性范围有限的问题。这意味着对于计算受限的任务,您不会获得任何显着的并行性,并且它的线程可能最适合处理 I/O 受限的任务,例如 Web 请求或文件系统访问。最接近的 Python 可能是通过多处理模块来实现这一点,但这需要为每个节点单独处理。

其次,即使您要在脉动数组中的数值运算中获得并行性,您也需要一些锁定机制来允许不同的线程交换数据(或消息),而不会在它们尝试读取时破坏彼此的内存和同时写入数据。

至于您示例中的数据结构,我认为您最好让 systolic 数组中的每个节点都引用其上游节点,而不是知道它位于 NxM 网格中的特定位置。我认为脉动阵列没有任何理由需要是矩形网格,并且任何有向无环图(DAG)仍然具有高效分布式计算的潜力。

总的来说,我预计在 Python 中进行这种模拟的计算开销相对于 Scala 或 C++ 等低级语言所能实现的计算开销是巨大的。即便如此,除非脉动阵列中的每个节点都在进行大量计算(即远远超过几个乘加),否则节点之间交换消息的开销将是巨大的。因此,我认为您的模拟主要是为了了解数据流和阵列的高级行为,而不是接近定制 DSP(数字信号处理)硬件所能提供的功能。如果是这种情况,那么我很想不使用线程并使用一个集中的消息队列,所有节点都向该消息队列提交由全局消息分发机制传递的消息。

于 2018-02-03T08:28:24.543 回答