一般规则是对作业使用正确的算法,除非卷积核与数据相比很短,否则它是基于 FFT 的卷积(短大致意味着小于 log2(n),其中 n 是数据的长度) .
在您的情况下,由于您要对两个大小相等的数据集进行卷积,因此您可能需要考虑基于 FFT 的卷积。
显然,scipy.signal.fftconvolve
在这方面存在缺陷。使用更快的 FFT 算法,您可以通过滚动自己的卷积例程来做得更好(fftconvolve 将变换大小强制为 2 的幂没有帮助,否则可能会被猴子修补)。
以下代码使用pyfftw,我对FFTW的包装器并创建了一个自定义卷积类CustomFFTConvolution
:
class CustomFFTConvolution(object):
def __init__(self, A, B, threads=1):
shape = (np.array(A.shape) + np.array(B.shape))-1
if np.iscomplexobj(A) and np.iscomplexobj(B):
self.fft_A_obj = pyfftw.builders.fftn(
A, s=shape, threads=threads)
self.fft_B_obj = pyfftw.builders.fftn(
B, s=shape, threads=threads)
self.ifft_obj = pyfftw.builders.ifftn(
self.fft_A_obj.get_output_array(), s=shape,
threads=threads)
else:
self.fft_A_obj = pyfftw.builders.rfftn(
A, s=shape, threads=threads)
self.fft_B_obj = pyfftw.builders.rfftn(
B, s=shape, threads=threads)
self.ifft_obj = pyfftw.builders.irfftn(
self.fft_A_obj.get_output_array(), s=shape,
threads=threads)
def __call__(self, A, B):
fft_padded_A = self.fft_A_obj(A)
fft_padded_B = self.fft_B_obj(B)
return self.ifft_obj(fft_padded_A * fft_padded_B)
这用作:
custom_fft_conv = CustomFFTConvolution(A, B)
C = custom_fft_conv(A, B) # This can contain different values to during construction
threads
构造类时带有可选参数。创建类的目的是利用 FFTW 提前规划变换的能力。
下面的完整演示代码只是扩展了@Kelsey对时间等的回答。
与 numba 解决方案和 vanilla fftconvolve 解决方案相比,加速效果显着。对于 n = 33,它比两者都快 40-45 倍。
from timeit import Timer
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import fftconvolve
from numba import jit, double
import pyfftw
# Original code
def custom_convolution(A, B):
dimA = A.shape[0]
dimB = B.shape[0]
dimC = dimA + dimB
C = np.zeros((dimC, dimC, dimC))
for x1 in range(dimA):
for x2 in range(dimB):
for y1 in range(dimA):
for y2 in range(dimB):
for z1 in range(dimA):
for z2 in range(dimB):
x = x1 + x2
y = y1 + y2
z = z1 + z2
C[x, y, z] += A[x1, y1, z1] * B[x2, y2, z2]
return C
# Numba'ing the function with the JIT compiler
numba_convolution = jit(double[:, :, :](double[:, :, :],
double[:, :, :]))(custom_convolution)
def fft_convolution(A, B):
return fftconvolve(A, B, mode='full')
class CustomFFTConvolution(object):
def __init__(self, A, B, threads=1):
shape = (np.array(A.shape) + np.array(B.shape))-1
if np.iscomplexobj(A) and np.iscomplexobj(B):
self.fft_A_obj = pyfftw.builders.fftn(
A, s=shape, threads=threads)
self.fft_B_obj = pyfftw.builders.fftn(
B, s=shape, threads=threads)
self.ifft_obj = pyfftw.builders.ifftn(
self.fft_A_obj.get_output_array(), s=shape,
threads=threads)
else:
self.fft_A_obj = pyfftw.builders.rfftn(
A, s=shape, threads=threads)
self.fft_B_obj = pyfftw.builders.rfftn(
B, s=shape, threads=threads)
self.ifft_obj = pyfftw.builders.irfftn(
self.fft_A_obj.get_output_array(), s=shape,
threads=threads)
def __call__(self, A, B):
fft_padded_A = self.fft_A_obj(A)
fft_padded_B = self.fft_B_obj(B)
return self.ifft_obj(fft_padded_A * fft_padded_B)
def run_test():
reps = 10
nt, ft, cft, cft2 = [], [], [], []
x = range(2, 34)
for N in x:
print N
A = np.random.rand(N, N, N)
B = np.random.rand(N, N, N)
custom_fft_conv = CustomFFTConvolution(A, B)
custom_fft_conv_nthreads = CustomFFTConvolution(A, B, threads=2)
C1 = numba_convolution(A, B)
C2 = fft_convolution(A, B)
C3 = custom_fft_conv(A, B)
C4 = custom_fft_conv_nthreads(A, B)
assert np.allclose(C1[:-1, :-1, :-1], C2)
assert np.allclose(C1[:-1, :-1, :-1], C3)
assert np.allclose(C1[:-1, :-1, :-1], C4)
t = Timer(lambda: numba_convolution(A, B))
nt.append(t.timeit(number=reps))
t = Timer(lambda: fft_convolution(A, B))
ft.append(t.timeit(number=reps))
t = Timer(lambda: custom_fft_conv(A, B))
cft.append(t.timeit(number=reps))
t = Timer(lambda: custom_fft_conv_nthreads(A, B))
cft2.append(t.timeit(number=reps))
plt.plot(x, ft, label='scipy.signal.fftconvolve')
plt.plot(x, nt, label='custom numba convolve')
plt.plot(x, cft, label='custom pyfftw convolve')
plt.plot(x, cft2, label='custom pyfftw convolve with threading')
plt.legend()
plt.show()
if __name__ == '__main__':
run_test()
编辑: 最近的 scipy 在不总是填充到 2 长度的幂方面做得更好,因此输出更接近 pyFFTW 案例。