12

在我当前的项目中,我需要以一种稍微不寻常的方式“卷积”两个三维数组:

假设我们有两个三维数组 A 和 B,维度为 dimA 和 dimB(每个轴都相同)。现在我们要创建第三个数组 C,每个轴的维度为 dimA+dimB。

C 的条目计算如下:

c_{x1+x2,y1+y2,z1+z2} += a_{x1,y1,z1} * b_{x2,y2,z2}

我当前的版本很简单:

dimA = A.shape[0]
dimB = B.shape[0]
dimC = dimA+dimB

C = np.zeros((dimC,dimC,dimC))
for x1 in range(dimA):
    for x2 in range(dimB):
        for y1 in range(dimA):
            for y2 in range(dimB):
                for z1 in range(dimA):
                    for z2 in range(dimB):
                        x = x1+x2
                        y = y1+y2
                        z = z1+z2
                        C[x,y,z] += A[x1,y1,z1] * B[x2,y2,z2] 

不幸的是,这个版本真的很慢而且不可用。

我的第二个版本是:

C = scipy.signal.fftconvolve(A,B,mode="full")

但这仅计算元素max(dimA,dimB)

有没有人有更好的主意?

4

3 回答 3

5

您是否尝试过使用Numba?它是一个包,可让您包装使用 JIT 编译器通常很慢的 Python 代码。我使用 Numba 快速解决了您的问题,并获得了显着的加速。使用 IPython 的魔法timeit魔法函数,该custom_convolution函数耗时约 18 秒,而 Numba 的优化函数耗时 10.4 毫秒。那是超过 1700 的加速

以下是 Numba 的实现方式。

import numpy as np
from numba import jit, double

s = 15
array_a = np.random.rand(s ** 3).reshape(s, s, s)
array_b = np.random.rand(s ** 3).reshape(s, s, s)

# Original code
def custom_convolution(A, B):

    dimA = A.shape[0]
    dimB = B.shape[0]
    dimC = dimA + dimB

    C = np.zeros((dimC, dimC, dimC))
    for x1 in range(dimA):
        for x2 in range(dimB):
            for y1 in range(dimA):
                for y2 in range(dimB):
                    for z1 in range(dimA):
                        for z2 in range(dimB):
                            x = x1 + x2
                            y = y1 + y2
                            z = z1 + z2
                            C[x, y, z] += A[x1, y1, z1] * B[x2, y2, z2]
    return C

# Numba'ing the function with the JIT compiler
fast_convolution = jit(double[:, :, :](double[:, :, :],
                        double[:, :, :]))(custom_convolution)

如果您计算两个函数的结果之间的残差,您将得到零。这意味着 JIT 实现可以正常工作。

slow_result = custom_convolution(array_a, array_b) 
fast_result = fast_convolution(array_a, array_b)

print np.max(np.abs(slow_result - fast_result))

我得到的输出是0.0.

您可以将 Numba 安装到当前的 Python 设置中,也可以使用 continuum.io 的AnacondaCE包快速尝试。

最后但同样重要的是,Numba 的功能比该scipy.signal.fftconvolve功能快几倍。

注意:我使用的是 Anaconda 而不是 AnacondaCE。对于 Numba 的性能,两个包之间存在一些差异,但我认为差异不会太大。

于 2013-02-25T12:39:54.537 回答
4

一般规则是对作业使用正确的算法,除非卷积核与数据相比很短,否则它是基于 FFT 的卷积(短大致意味着小于 log2(n),其中 n 是数据的长度) .

在您的情况下,由于您要对两个大小相等的数据集进行卷积,因此您可能需要考虑基于 FFT 的卷积。

显然,scipy.signal.fftconvolve在这方面存在缺陷。使用更快的 FFT 算法,您可以通过滚动自己的卷积例程来做得更好(fftconvolve 将变换大小强制为 2 的幂没有帮助,否则可能会被猴子修补)。

以下代码使用pyfftw,我对FFTW的包装器并创建了一个自定义卷积类CustomFFTConvolution

class CustomFFTConvolution(object):

    def __init__(self, A, B, threads=1):

        shape = (np.array(A.shape) + np.array(B.shape))-1

        if np.iscomplexobj(A) and np.iscomplexobj(B):
            self.fft_A_obj = pyfftw.builders.fftn(
                    A, s=shape, threads=threads)
            self.fft_B_obj = pyfftw.builders.fftn(
                    B, s=shape, threads=threads)
            self.ifft_obj = pyfftw.builders.ifftn(
                    self.fft_A_obj.get_output_array(), s=shape,
                    threads=threads)

        else:
            self.fft_A_obj = pyfftw.builders.rfftn(
                    A, s=shape, threads=threads)
            self.fft_B_obj = pyfftw.builders.rfftn(
                    B, s=shape, threads=threads)
            self.ifft_obj = pyfftw.builders.irfftn(
                    self.fft_A_obj.get_output_array(), s=shape,
                    threads=threads)

    def __call__(self, A, B):

        fft_padded_A = self.fft_A_obj(A)
        fft_padded_B = self.fft_B_obj(B)

        return self.ifft_obj(fft_padded_A * fft_padded_B)

这用作:

custom_fft_conv = CustomFFTConvolution(A, B)
C = custom_fft_conv(A, B) # This can contain different values to during construction

threads构造类时带有可选参数。创建类的目的是利用 FFTW 提前规划变换的能力。

下面的完整演示代码只是扩展了@Kelsey对时间等的回答。

与 numba 解决方案和 vanilla fftconvolve 解决方案相比,加速效果显着。对于 n = 33,它比两者都快 40-45 倍。

from timeit import Timer
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import fftconvolve
from numba import jit, double
import pyfftw

# Original code
def custom_convolution(A, B):

    dimA = A.shape[0]
    dimB = B.shape[0]
    dimC = dimA + dimB

    C = np.zeros((dimC, dimC, dimC))
    for x1 in range(dimA):
        for x2 in range(dimB):
            for y1 in range(dimA):
                for y2 in range(dimB):
                    for z1 in range(dimA):
                        for z2 in range(dimB):
                            x = x1 + x2
                            y = y1 + y2
                            z = z1 + z2
                            C[x, y, z] += A[x1, y1, z1] * B[x2, y2, z2]
    return C

# Numba'ing the function with the JIT compiler
numba_convolution = jit(double[:, :, :](double[:, :, :],
                        double[:, :, :]))(custom_convolution)

def fft_convolution(A, B):
    return fftconvolve(A, B, mode='full')

class CustomFFTConvolution(object):

    def __init__(self, A, B, threads=1):

        shape = (np.array(A.shape) + np.array(B.shape))-1

        if np.iscomplexobj(A) and np.iscomplexobj(B):
            self.fft_A_obj = pyfftw.builders.fftn(
                    A, s=shape, threads=threads)
            self.fft_B_obj = pyfftw.builders.fftn(
                    B, s=shape, threads=threads)
            self.ifft_obj = pyfftw.builders.ifftn(
                    self.fft_A_obj.get_output_array(), s=shape,
                    threads=threads)

        else:
            self.fft_A_obj = pyfftw.builders.rfftn(
                    A, s=shape, threads=threads)
            self.fft_B_obj = pyfftw.builders.rfftn(
                    B, s=shape, threads=threads)
            self.ifft_obj = pyfftw.builders.irfftn(
                    self.fft_A_obj.get_output_array(), s=shape,
                    threads=threads)

    def __call__(self, A, B):

        fft_padded_A = self.fft_A_obj(A)
        fft_padded_B = self.fft_B_obj(B)

        return self.ifft_obj(fft_padded_A * fft_padded_B)

def run_test():
    reps = 10
    nt, ft, cft, cft2 = [], [], [], []
    x = range(2, 34)

    for N in x:
        print N
        A = np.random.rand(N, N, N)
        B = np.random.rand(N, N, N)

        custom_fft_conv = CustomFFTConvolution(A, B)
        custom_fft_conv_nthreads = CustomFFTConvolution(A, B, threads=2)

        C1 = numba_convolution(A, B)
        C2 = fft_convolution(A, B)
        C3 = custom_fft_conv(A, B)
        C4 = custom_fft_conv_nthreads(A, B)

        assert np.allclose(C1[:-1, :-1, :-1], C2)
        assert np.allclose(C1[:-1, :-1, :-1], C3)
        assert np.allclose(C1[:-1, :-1, :-1], C4)

        t = Timer(lambda: numba_convolution(A, B))
        nt.append(t.timeit(number=reps))
        t = Timer(lambda: fft_convolution(A, B))
        ft.append(t.timeit(number=reps))
        t = Timer(lambda: custom_fft_conv(A, B))
        cft.append(t.timeit(number=reps))
        t = Timer(lambda: custom_fft_conv_nthreads(A, B))
        cft2.append(t.timeit(number=reps))

    plt.plot(x, ft, label='scipy.signal.fftconvolve')
    plt.plot(x, nt, label='custom numba convolve')
    plt.plot(x, cft, label='custom pyfftw convolve')
    plt.plot(x, cft2, label='custom pyfftw convolve with threading')        
    plt.legend()
    plt.show()

if __name__ == '__main__':
    run_test()

编辑: 最近的 scipy 在不总是填充到 2 长度的幂方面做得更好,因此输出更接近 pyFFTW 案例。

于 2013-03-01T19:45:33.853 回答
3

上面的 Numba 方法是一个巧妙的技巧,但只会对相对较小的 N 有利。O(N^6)算法每次都会杀死你,不管它的实现速度有多快。在我的测试中,该fftconvolve方法在 N=20 附近越过越快,而 N=32 的速度是 10 倍。省略上面的定义custom_convolution

from timeit import Timer
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import fftconvolve
from numba import jit, double

# Numba'ing the function with the JIT compiler
numba_convolution = jit(double[:, :, :](double[:, :, :],
                        double[:, :, :]))(custom_convolution)

def fft_convolution(A, B):
    return fftconvolve(A, B, mode='full')

if __name__ == '__main__':
    reps = 3
    nt, ft = [], []
    x = range(2, 34)
    for N in x:
        print N
        A = np.random.rand(N, N, N)
        B = np.random.rand(N, N, N)
        C1 = numba_convolution(A, B)
        C2 = fft_convolution(A, B)
        assert np.allclose(C1[:-1, :-1, :-1], C2)
        t = Timer(lambda: numba_convolution(A, B))
        nt.append(t.timeit(number=reps))
        t = Timer(lambda: fft_convolution(A, B))
        ft.append(t.timeit(number=reps))
    plt.plot(x, ft, x, nt)
    plt.show()

它也非常依赖于 N,因为对于 2 的幂,FFT 会快得多。对于 N=17 到 N=32,FFT 版本的时间基本上是恒定的,并且在 N=33 时仍然更快,其中它又开始迅速发散。

您可以尝试在 Numba 中包装 FFT 实现,但不能直接使用 scipy 版本执行此操作。

(抱歉创建一个新答案,但我没有直接回复的积分。)

于 2013-02-25T20:06:54.637 回答