3

我正在尝试实现双音排序算法。

Parallel Bitonic Sort Algorithm for processor Pk (for k := 0 : : : P  1)
d:= log P /* cube dimension */
sort(local  datak) /* sequential sort */
/* Bitonic Sort follows */
for i:=1 to d do
    window-id = Most Signicant (d-i) bits of Pk
    for j:=(i-1) down to 0 do
        if((window-id is even AND jth bit of Pk = 0) OR 
                   (window-id is odd AND jth bit of Pk = 1))
        then call CompareLow(j)
        else call CompareHigh(j)
        endif
    endfor
endfor

资料来源:http ://www.cs.rutgers.edu/~venugopa/parallel_summer2012/mpi_bitonic.html#expl

不幸的是,CompareHigh 和 CompareLow 的描述充其量是不稳定的。

据我了解,CompareHigh 将从调用进程及其伙伴进程中获取数据,将两者合并,排序,并将上半部分存储在调用进程的数据中。CompareLow 也会这样做,并取下半部分。

我已经验证我的实现选择了正确的合作伙伴并在每个进程的每次迭代期间调用了正确的 CompareHigh/Low 方法,但我的输出仍然只是部分排序。我假设我的 CompareHigh/Low 实现不正确。

这是我当前输出的示例:

[0] [17 24 30 37]
[1] [ 92 114 147 212]
[2] [ 12  89  92 102]
[3] [172 185 202 248]
[4] [ 30  51 111 148]
[5] [148 149 158 172]
[6] [ 17  24  59 149]
[7] [160 230 247 250]

这是我的 CompareHigh、CompareLow 和合并函数:

def CompareHigh(self, j):
    partner = self.getPartner(self.rank, j)
    print "[%d] initiating HIGH with %d" % (self.rank, partner)
    new_data = np.empty(self.data.shape, dtype='i')

    self.comm.Send(self.data, dest = partner, tag=55)
    self.comm.Recv(new_data, source = partner, tag=55)

    assert(self.data.shape == new_data.shape)
    self.data = np.split(self.merge(data, new_data), 2)[1]

def CompareLow(self, j):
    partner = self.getPartner(self.rank, j)
    print "[%d] initiating LOW with %d" % (self.rank, partner)
    new_data = np.empty(self.data.shape, dtype='i')

    self.comm.Recv(new_data, source = partner, tag=55)
    self.comm.Send(self.data, dest = partner, tag=55)

    assert(self.data.shape == new_data.shape)
    self.data = np.split(self.merge(data, new_data), 2)[0]

def merge(self, a, b):
    merged = []
    i = 0
    j = 0
    while i < a.shape[0] and j < b.shape[0]:
        if a[i] < b[j]:
            merged.append(a[i])
            i += 1
        else:
            merged.append(b[j])
            j += 1
    while i < a.shape[0]:
        merged.append(a[i])
        i += 1

    while j < a.shape[0]:
        merged.append(b[j])
        j += 1

    return np.array(merged)


def getPartner(self, rank, j):
    # Partner process is process with j_th bit of rank flipped
    j_mask = 1 << j
    partner = rank ^ j_mask
    return partner

最后,这里是实际的算法循环:

# Generating map of bit_j for each process.
bit_j = [0 for i in range(d)]
for i in range(d):
    bit_j[i] = (rank >> i) & 1    

bs = BitonicSorter(data)
for i in range(1, d+1):

    window_id = rank >> i
    for j in reversed(range(0, i)):
        if rank == 0: print "[%d] iteration %d, %d" %(rank, i, j) 
        comm.Barrier()
        if (window_id%2 == 0 and bit_j[j] == 0) \
                    or (window_id%2 == 1 and bit_j[j] == 1):
            bs.CompareLow(j)
        else:
            bs.CompareHigh(j)
        if rank == 0: print ""
        comm.Barrier()

if rank != 0:
    comm.Send(bs.data, dest = 0, tag=55)
    comm.Barrier()

else:
    dataset[0] = bs.data
    for i in range(1, size) :
        comm.Recv(dataset[i], source = i, tag=55)
    comm.Barrier()
    for i, datai in enumerate(dataset):
        print "[%d]\t%s" % (i, str(datai))
    dataset = np.array(dataset).reshape(data_size)
4

1 回答 1

1

好烦我:

self.data = np.split(self.merge(data, new_data), 2)

是有问题的线路。我不确定变量数据绑定到什么,但这就是问题所在。

于 2012-12-02T21:42:34.630 回答