1

我用 Python 编写了以下 NumPy 代码:

def inbox_(points, polygon):
    """ Finding points in a region """

    ll = np.amin(polygon, axis=0)                        # lower limit
    ur = np.amax(polygon, axis=0)                        # upper limit

    in_idx = np.all(np.logical_and(ll <= points, points < ur), axis=1)        # points in the range [boolean]

    return in_idx


def operation_(r, gap, ends_ind):
    """ calculation formula which is applied on the points specified by inbox_ function """

    r_active = np.take(r, ends_ind)                       # taking values from "r" based on indices and shape (paired_values) of "ends_ind"
    r_sub = np.subtract.reduce(r_active, axis=1)          # subtracting each paired "r" determined by "ends_ind" [this line will be used in the final return formula]
    r_add = np.add.reduce(r_active, axis=1)               # adding each paired "r" determined by "ends_ind" [this line will be used in the final return formula]
    paired_cent_dis = np.sum((r_add, gap), axis=0)        # distance of the each two paired points

    return (np.power(gap, 2) * (np.power(paired_cent_dis, 2) + 5 * paired_cent_dis * r_add - 7 * np.power(r_sub, 2))) / (3 * paired_cent_dis)      # Formula



def elapses(r, pos, gap, ends_ind, elem_vert, contact_poss):
    if len(gap) > 0:
        elaps = np.empty([len(elem_vert), ], dtype=object)
        operate_ = operation_(r, gap, ends_ind)

        #elbav = np.empty([len(elem_vert), ], dtype=object)
        #con_num = 0

        for i, j in enumerate(elem_vert):                        # loop for each section (cell or region) of a mesh
            in_bool = inbox_(contact_poss, j)                    # getting boolean array for points within that section
            elaps[i] = np.sum(operate_[in_bool])                 # performing some calculations on that points and get the sum of them for each section
            operate_ = operate_[np.invert(in_bool)]              # slicing the arrays by deleting the points on which the calculations were performed to speed-up the code in next loops                        
            contact_poss = contact_poss[np.invert(in_bool)]      # as above

            #con_num += sum(inbox_(contact_poss, j))
            #inba_bool = inbox_(pos, j)
            #elbav[i] = 4 * np.pi * np.sum(np.power(r[inba_bool], 3)) / 3
            #pos = pos[np.invert(inba_bool)]
            #r = r[np.invert(inba_bool)]

        return elaps


r = np.load('a.npy')
pos = np.load('b.npy')
gap = np.load('c.npy')
ends_ind = np.load('d.npy')
elem_vert = np.load('e.npy')
contact_poss = np.load('f.npy')

elapses(r, pos, gap, ends_ind, elem_vert, contact_poss)

# a --------r-------> parameter corresponding to each coordinate (point); here radius                                  (23605,)     <class 'numpy.ndarray'> <class 'numpy.float64'>
# b -------pos------> coordinates of the points                                                                        (23605, 3)   <class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.float64'>
# c -------gap------> if we consider points as spheres by that radii [r], it is maximum length for spheres' over-lap   (103832,)    <class 'numpy.ndarray'> <class 'numpy.float64'>
# d ----ends_ind----> indices for each over-laped spheres                                                              (103832, 2)  <class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.int64'>
# e ---elem_vert----> vertices of the mesh's sections or cells                                                         (2000, 8, 3) <class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.float64'>
# f --contact_poss--> a coordinate between the paired spheres                                                          (103832, 3)  <class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.float64'>

此代码将经常从另一个具有大数据输入的代码中调用。因此,加速此代码是必不可少的。我尝试使用JAXNumbajit库中的装饰器来加速代码,但我无法正确使用它来使代码更好。我已经在 Colab 上测试了代码(对于 3 个循环数为 20、250 和 2000 的数据集)的速度,结果是:

11  (ms), 47   (ms), 6.62 (s)       (per loop) <-- without the commented code lines in the code
137 (ms), 1.66 (s) , 4    (m)       (per loop) <-- with activating the commented code lines in the code  

这段代码所做的是在一个范围内找到一些坐标,然后对它们进行一些计算。
对于任何可以显着加速此代码的答案,我将不胜感激(我相信它可以)。此外,我将感谢任何有关通过更改(替换)使用的 NumPy 方法和……或为数学运算编写方法来加速代码的经验丰富的建议。

笔记:

  • 建议的答案必须可由 python 版本 2 执行(适用于版本 2 和 3 非常好)
  • 代码中注释的代码行对于主要目的来说是不必要的,只是为了进一步评估而编写的。任何用建议的答案处理这些行的建议都值得赞赏(不需要)。

测试数据集:
小数据集:https
://drive.google.com/file/d/1CswjyoqS8ogLmLQa_oNTOj221chDcbK8/view?usp=sharing 中等数据集:https ://drive.google.com/file/d/14RJ0Ackx88NzQWloops5FagzuNQYDSrh/ view?usp=sharing
大数据集:https ://drive.google.com/file/d/1dJnXpb3HiAGcRC9PPTwui9joNcij4E_E/view?usp=sharing

4

1 回答 1

3

首先,算法可以改进得更高效。事实上,一个多边形可以直接分配给每个点。这就像按多边形对点进行分类。分类完成后,您可以通过 key 执行一个/多个归约,其中 key 是多边形 ID。

这个新算法包括:

  • 计算多边形的所有边界框;
  • 按多边形对点进行分类;
  • 通过键执行归约(其中键是多边形 ID)。

这种方法比遍历每个多边形的所有点并过滤属性数组(例如operate_contact_poss)要有效得多。实际上,过滤是一项昂贵的操作,因为它需要完全读取目标数组(可能不适合 CPU 缓存)然后再写回。更不用说这个操作需要分配/删除一个临时数组,如果它没有就地执行,并且该操作不能从大多数 x86/x86-64 平台上的SIMD 指令中受益(因为它需要新的 AVX-512指令系统)。并行化也更难,因为过滤步骤太快,线程无法使用,但步骤需要按顺序完成。

关于算法的实现,Numba可以用来加速很多整体的计算。使用 Numba 的主要好处是大大减少了Numpy 在当前实现中创建的昂贵临时数组的数量。请注意,您可以为 Numba指定函数类型,以便在定义函数时编译函数。断言可用于使代码更健壮并帮助编译器了解给定维度的大小,从而生成更快的代码(Numba 的 JIT 编译器可以展开循环)。三元运算符可以帮助 JIT 编译器生成更快的无分支程序。

请注意,可以使用多个线程轻松并行化分类。但是,需要非常小心常量传播,因为一些关键常量(如工作数组和断言的形状)往往不会传播到线程执行的代码中,而传播对于优化热循环(例如。矢量化,展开)。另请注意,在具有许多内核(从 10 毫秒到 0.1 毫秒)的机器上创建许多线程可能会很昂贵。因此,仅在大输入数据上使用并行实现通常会更好。

这是生成的实现(使用 Python2 和 Python3):

@nb.njit('float64[::1](float64[::1], float64[::1], int64[:,::1])')
def operation_(r, gap, ends_ind):
    """ calculation formula which is applied on the points specified by findMatchingPolygons_ function """

    nPoints = ends_ind.shape[0]
    assert ends_ind.shape[1] == 2
    assert gap.size == nPoints

    formula = np.empty(nPoints, dtype=np.float64)

    for i in range(nPoints):
        ind0, ind1 = ends_ind[i]
        r0, r1 = r[ind0], r[ind1]
        r_sub = r0 - r1
        r_add = r0 + r1
        cur_gap = gap[i]
        paired_cent_dis = r_add + cur_gap
        formula[i] = (cur_gap**2 * (paired_cent_dis**2 + 5 * paired_cent_dis * r_add - 7 * r_sub**2)) / (3 * paired_cent_dis)

    return formula


# Use `parallel=True` for a parallel implementation
@nb.njit('int32[::1](float64[:,::1], float64[:,:,::1])')
def findMatchingPolygons_(points, polygons):
    """ Attribute to all point a region """

    nPolygons = polygons.shape[0]
    nPolygonPoints = polygons.shape[1]
    nPoints = points.shape[0]
    assert points.shape[1] == 3
    assert polygons.shape[2] == 3

    # Compute the bounding boxes of all polygons

    ll = np.empty((nPolygons, 3), dtype=np.float64)
    ur = np.empty((nPolygons, 3), dtype=np.float64)

    for i in range(nPolygons):
        ll_x, ll_y, ll_z = polygons[i, 0]
        ur_x, ur_y, ur_z = polygons[i, 0]

        for j in range(1, nPolygonPoints):
            x, y, z = polygons[i, j]
            ll_x = x if x<ll_x else ll_x
            ll_y = y if y<ll_y else ll_y
            ll_z = z if z<ll_z else ll_z
            ur_x = x if x>ur_x else ur_x
            ur_y = y if y>ur_y else ur_y
            ur_z = z if z>ur_z else ur_z

        ll[i] = ll_x, ll_y, ll_z
        ur[i] = ur_x, ur_y, ur_z

    # Find for each point its corresponding polygon

    pointPolygonId = np.empty(nPoints, dtype=np.int32)

    # Use `nb.prange(nPoints)` for a parallel implementation
    for i in range(nPoints):
        x, y, z = points[i, 0], points[i, 1], points[i, 2]
        pointPolygonId[i] = -1

        for j in range(polygons.shape[0]):
            if ll[j, 0] <= x < ur[j, 0] and ll[j, 1] <= y < ur[j, 1] and ll[j, 2] <= z < ur[j, 2]:
                pointPolygonId[i] = j
                break

    return pointPolygonId


@nb.njit('float64[::1](float64[:,:,::1], float64[:,::1], float64[::1])')
def computeSections_(elem_vert, contact_poss, operate_):
    nPolygons = elem_vert.shape[0]
    elaps = np.zeros(nPolygons, dtype=np.float64)

    pointPolygonId = findMatchingPolygons_(contact_poss, elem_vert)

    for i, polygonId in enumerate(pointPolygonId):
        if polygonId >= 0:
            elaps[polygonId] += operate_[i]

    return elaps


def elapses(r, pos, gap, ends_ind, elem_vert, contact_poss):
    if len(gap) > 0:
        operate_ = operation_(r, gap, ends_ind)
        return computeSections_(elem_vert, contact_poss, operate_)


r = np.load('a.npy')
pos = np.load('b.npy')
gap = np.load('c.npy')
ends_ind = np.load('d.npy')
elem_vert = np.load('e.npy')
contact_poss = np.load('f.npy')

elapses(r, pos, gap, ends_ind, elem_vert, contact_poss)

以下是旧 2 核机器 (i7-3520M) 上的结果:

Small dataset:
 - Original version:                5.53 ms
 - Proposed version (sequential):   0.22 ms  (x25)
 - Proposed version (parallel):     0.20 ms  (x27)

Medium dataset:
 - Original version:               53.40 ms
 - Proposed version (sequential):   1.24 ms  (x43)
 - Proposed version (parallel):     0.62 ms  (x86)

Big dataset:
 - Original version:               5742 ms
 - Proposed version (sequential):   144 ms   (x40)
 - Proposed version (parallel):      67 ms   (x86)

因此,提议的实现比原来的实现快 86 倍。

于 2021-12-27T01:33:12.490 回答