我想在 Python 中执行多项式微积分。里面的polynomial包裹numpy对我来说不够快。因此,我决定在 Fortran 中重写几个函数,并使用f2py它来创建可以轻松导入 Python 的共享库。目前,我正在将我的单变量和双变量多项式评估例程与他们的numpy同行进行基准测试。

在单变量例程中,我使用霍纳的方法numpy.polynomial.polynomial.polyvalnumpy我观察到,随着多项式阶数的增加,Fortran 例程比对应例程快的因素也在增加。

在双变量例程中,我两次使用霍纳的方法。首先在 y 中,然后在 x 中。不幸的是,我观察到,对于增加多项式阶数,numpy对应物会赶上并最终超过我的 Fortran 例程。由于numpy.polynomial.polynomial.polyval2d使用类似于我的方法,我认为第二个观察结果很奇怪。

我希望这个结果源于我对 Fortran 和f2py. 可能有人知道为什么单变量例程总是显得优越,而双变量例程只对低阶多项式更优越?

编辑 这是我最新更新的代码、基准脚本和性能图:


subroutine polyval(p, x, pval, nx)

    implicit none

    real(8), dimension(nx), intent(in) :: p
    real(8), intent(in) :: x
    real(8), intent(out) :: pval
    integer, intent(in) :: nx
    integer :: i

    pval = 0.0d0
    do i = nx, 1, -1
        pval = pval*x + p(i)
    end do

end subroutine polyval

subroutine polyval2(p, x, y, pval, nx, ny)

    implicit none

    real(8), dimension(nx, ny), intent(in) :: p
    real(8), intent(in) :: x, y
    real(8), intent(out) :: pval
    integer, intent(in) :: nx, ny
    real(8) :: tmp
    integer :: i, j

    pval = 0.0d0
    do j = ny, 1, -1
        tmp = 0.0d0
        do i = nx, 1, -1
            tmp = tmp*x + p(i, j)
        end do
        pval = pval*y + tmp
    end do

end subroutine polyval2

subroutine polyval3(p, x, y, z, pval, nx, ny, nz)

    implicit none

    real(8), dimension(nx, ny, nz), intent(in) :: p
    real(8), intent(in) :: x, y, z
    real(8), intent(out) :: pval
    integer, intent(in) :: nx, ny, nz
    real(8) :: tmp, tmp2
    integer :: i, j, k

    pval = 0.0d0
    do k = nz, 1, -1
        tmp2 = 0.0d0
        do j = ny, 1, -1
            tmp = 0.0d0
            do i = nx, 1, -1
                tmp = tmp*x + p(i, j, k)
            end do
            tmp2 = tmp2*y + tmp
        end do
        pval = pval*z + tmp2
    end do

end subroutine polyval3


import time
import os

import numpy as np
import matplotlib.pyplot as plt

# Compile and import Fortran module
os.system('f2py -c polynomial.f95 --opt="-O3 -ffast-math" \
--f90exec="gfortran-4.8" -m polynomial')
import polynomial

# Create random x and y value
x = np.random.rand()
y = np.random.rand()
z = np.random.rand()

# Number of repetition
repetition = 10

# Number of times to loop over a function
run = 100

# Number of data points
points = 26

# Max number of coefficients for univariate case
n_uni_min = 4
n_uni_max = 100

# Max number of coefficients for bivariate case
n_bi_min = 4
n_bi_max = 100

# Max number of coefficients for trivariate case
n_tri_min = 4
n_tri_max = 100

# Case on/off switch
case_on = [1, 1, 1]

case_1_done = 0
case_2_done = 0
case_3_done = 0


if case_on[0]:

    # Array containing the polynomial order + 1 for several univariate polynomials
    n_uni = np.array([int(x) for x in np.linspace(n_uni_min, n_uni_max, points)])

    # Initialise arrays for storing timing results
    time_uni_numpy = np.zeros(n_uni.size)
    time_uni_fortran = np.zeros(n_uni.size)

    for i in xrange(len(n_uni)):
        # Create random univariate polynomial of order n - 1
        p = np.random.rand(n_uni[i])

        # Time evaluation of polynomial using NumPy
        dt = []
        for j in xrange(repetition):
            t1 = time.time()
            for r in xrange(run): np.polynomial.polynomial.polyval(x, p)
            t2 = time.time()
            dt.append(t2 - t1)
        time_uni_numpy[i] = np.average(dt[2::])

        # Time evaluation of polynomial using Fortran
        dt = []
        for j in xrange(repetition):
            t1 = time.time()
            for r in xrange(run): polynomial.polyval(p, x)
            t2 = time.time()
            dt.append(t2 - t1)
        time_uni_fortran[i] = np.average(dt[2::])

    # Speed-up factor
    factor_uni = time_uni_numpy / time_uni_fortran

    results_uni = np.zeros([len(n_uni), 4])
    results_uni[:, 0] = n_uni
    results_uni[:, 1] = factor_uni
    results_uni[:, 2] = time_uni_numpy
    results_uni[:, 3] = time_uni_fortran
    print results_uni, '\n'

    plt.plot(n_uni, factor_uni)
    plt.title('Univariate comparison')
    plt.xlabel('# coefficients')
    plt.ylabel('Speed-up factor')
    plt.xlim(n_uni[0], n_uni[-1])
    plt.ylim(0, max(factor_uni))

case_1_done = 1


if case_on[1]:

    # Array containing the polynomial order + 1 for several bivariate polynomials
    n_bi = np.array([int(x) for x in np.linspace(n_bi_min, n_bi_max, points)])

    # Initialise arrays for storing timing results
    time_bi_numpy = np.zeros(n_bi.size)
    time_bi_fortran = np.zeros(n_bi.size)

    for i in xrange(len(n_bi)):
        # Create random bivariate polynomial of order n - 1 in x and in y
        p = np.random.rand(n_bi[i], n_bi[i])

        # Time evaluation of polynomial using NumPy
        dt = []
        for j in xrange(repetition):
            t1 = time.time()
            for r in xrange(run): np.polynomial.polynomial.polyval2d(x, y, p)
            t2 = time.time()
            dt.append(t2 - t1)
        time_bi_numpy[i] = np.average(dt[2::])

        # Time evaluation of polynomial using Fortran
        p = np.asfortranarray(p)
        dt = []
        for j in xrange(repetition):
            t1 = time.time()
            for r in xrange(run): polynomial.polyval2(p, x, y)
            t2 = time.time()
            dt.append(t2 - t1)
        time_bi_fortran[i] = np.average(dt[2::])

    # Speed-up factor
    factor_bi = time_bi_numpy / time_bi_fortran

    results_bi = np.zeros([len(n_bi), 4])
    results_bi[:, 0] = n_bi
    results_bi[:, 1] = factor_bi
    results_bi[:, 2] = time_bi_numpy
    results_bi[:, 3] = time_bi_fortran
    print results_bi, '\n'

    plt.plot(n_bi, factor_bi)
    plt.title('Bivariate comparison')
    plt.xlabel('# coefficients')
    plt.ylabel('Speed-up factor')
    plt.xlim(n_bi[0], n_bi[-1])
    plt.ylim(0, max(factor_bi))

case_2_done = 1


if case_on[2]:

    # Array containing the polynomial order + 1 for several bivariate polynomials
    n_tri = np.array([int(x) for x in np.linspace(n_tri_min, n_tri_max, points)])

    # Initialise arrays for storing timing results
    time_tri_numpy = np.zeros(n_tri.size)
    time_tri_fortran = np.zeros(n_tri.size)

    for i in xrange(len(n_tri)):
        # Create random bivariate polynomial of order n - 1 in x and in y
        p = np.random.rand(n_tri[i], n_tri[i])

        # Time evaluation of polynomial using NumPy
        dt = []
        for j in xrange(repetition):
            t1 = time.time()
            for r in xrange(run): np.polynomial.polynomial.polyval3d(x, y, z, p)
            t2 = time.time()
            dt.append(t2 - t1)
        time_tri_numpy[i] = np.average(dt[2::])

        # Time evaluation of polynomial using Fortran
        p = np.asfortranarray(p)
        dt = []
        for j in xrange(repetition):
            t1 = time.time()
            for r in xrange(run): polynomial.polyval3(p, x, y, z)
            t2 = time.time()
            dt.append(t2 - t1)
        time_tri_fortran[i] = np.average(dt[2::])

    # Speed-up factor
    factor_tri = time_tri_numpy / time_tri_fortran

    results_tri = np.zeros([len(n_tri), 4])
    results_tri[:, 0] = n_tri
    results_tri[:, 1] = factor_tri
    results_tri[:, 2] = time_tri_numpy
    results_tri[:, 3] = time_tri_fortran
    print results_tri

    plt.plot(n_bi, factor_bi)
    plt.title('Trivariate comparison')
    plt.xlabel('# coefficients')
    plt.ylabel('Speed-up factor')
    plt.xlim(n_tri[0], n_tri[-1])
    plt.ylim(0, max(factor_tri))
    print '\n'

case_3_done = 1



subroutine polyval(p, x, pval, nx)

    implicit none

    real*8, dimension(nx), intent(in) :: p
    real*8, intent(in) :: x
    real*8, intent(out) :: pval
    integer, intent(in) :: nx

    integer, parameter :: simd = 8
    real*8 :: tmp(simd), xpower(simd), maxpower
    integer :: i, j, k

    xpower(1) = x
    do i = 2, simd
        xpower(i) = xpower(i-1)*x
    end do
    maxpower = xpower(simd)

    tmp = 0.0d0
    do i = nx+1, simd+2, -simd
        do j = 1, simd
            tmp(j) = tmp(j)*maxpower + p(i-j)*xpower(simd-j+1)
        end do
    end do

    k = mod(nx-1, simd)
    if (k == 0) then
        pval = sum(tmp) + p(1)
        pval = sum(tmp) + p(k+1)
        do i = k, 1, -1
            pval = pval*x + p(i)
        end do
    end if

end subroutine polyval

编辑测试代码以验证直接上面的代码对于 x > 1 的结果是否很差

import polynomial as P
import numpy.polynomial.polynomial as PP

import numpy as np

for n in xrange(2,100):
    poly1n = np.random.rand(n)
    poly1f = np.asfortranarray(poly1n)

    x = 2

    print np.linalg.norm(P.polyval(poly1f, x) - PP.polyval(x, poly1n)), '\n'

在双变量情况下,p是一个二维数组。这意味着数组的 C 与 fortran 排序是不同的。默认情况下,numpy 函数给出 C 排序,显然 fortran 例程使用 fortran 排序。

f2py 足够聪明来处理这个问题,并自动在 C 和 fortran 格式数组之间进行转换。但是,这会导致一些开销,这是性能降低的可能原因之一。您可以通过在计时例程之外手动转换p为 fortran 类型来检查这是否是原因。numpy.asfortranarray当然,为了有意义,在您的实际用例中,您要确保输入数组是 fortran 顺序的。

f2py 有一个选项-DF2PY_REPORT_ON_ARRAY_COPY可以在任何时候复制数组时发出警告。

如果这不是原因,那么您需要考虑更深入的细节,例如您正在使用哪个 fortran 编译器,以及它正在应用什么样的优化。可能会减慢您的速度的示例包括在堆上分配数组而不是在堆栈上分配数组(对 的调用很昂贵malloc),尽管我希望这种影响对于更大的数组变得不那么重要。

最后,您应该考虑对于大的双变量拟合,Nnumpy 例程已经基本上处于最佳效率的可能性。在这种情况下,numpy 例程可能会花费大部分时间运行优化的 C 例程,相比之下,python 代码的开销可以忽略不计。在这种情况下,您不会期望您的 fortran 代码显示任何显着的加速。

我猜想,您的tmp数组变得太大,以至于它需要 L2、L3 甚至主内存访问而不是缓存。将这些循环分解并一次只处理其中的一部分(带状挖掘)可能会更好。

您的函数很短,因此通过内联 polyval 可以获得更好的结果。您还可以通过简单地反转循环来避免计算索引:

subroutine polyval2(p, x, y, pval, nx, ny)

    implicit none

    real(8), dimension(nx, ny), intent(in), target :: p
    real(8), intent(in) :: x, y
    real(8), intent(out) :: pval
    integer, intent(in) :: nx, ny
    real(8) :: tmp
    integer :: i, ii

    pval = 1.d0
    do i = ny, 1
        tmp = 1.d0
        do ii = nx, 1
            tmp = tmp*x + p(ii,i)
        end do
        pval = pval*y + tmp
    end do

end subroutine polyval2

与您发布的原始代码相比,使用此代码,大型数组的执行时间缩短了约 10%。(我用你的代码测试了一个纯 Fortran 程序 Nx=Ny=1000, gfortran -O3 -funroll-loops

我同意 haraldkl 的观点,当尺寸变得太大时,性能的急剧下降对于缓存/内存访问模式来说是非常典型的。剥离采矿有帮助,但我不鼓励自己这样做。改用编译器标志:-floop-strip-mineforgfortran和(包含在)-O3for ifort. 另外,尝试循环展开:-funroll-loopsforgfortranifort.

您可以使用 指定这些标志f2py -c --f90flags="..."

按照其他建议,p=np.asfortranarray(p)在我测试它时,在计时器之前使用确实使性能与 numpy 相当。我将双变量工作台的范围扩展到n_bi = np.array([2**i for i in xrange(1, 15)]),因此 p 矩阵将大于我的 L3 缓存大小。

为了进一步优化这一点,我认为自动编译器选项不会有太大帮助,因为内部循环具有依赖性。只有当您手动展开它时,才会ifort对最里面的循环进行矢量化。和是需要的gfortran。对于受主内存带宽限制的矩阵大小,这将比 numpy 的性能优势从 1 倍提高到 3 倍。-O3-ffast-math

更新:在将其应用于单变量代码并使用 编译后f2py --opt='-O3 -ffast-math' -c -m polynomial polynomial.f90,我得到以下 benchmark.py 的源代码和结果:

subroutine polyval(p, x, pval, nx)

implicit none

real*8, dimension(nx), intent(in) :: p
real*8, intent(in) :: x
real*8, intent(out) :: pval
integer, intent(in) :: nx

integer, parameter :: simd = 8
real*8 :: tmp(simd), vecx(simd), xfactor
integer :: i, j, k

! precompute factors
do i = 1, simd
end do
xfactor = x**simd

tmp = 0.0d0
do i = 1, nx, simd
    do k = 1, simd
        tmp(k) = tmp(k)*xfactor + p(nx-(i+k-1)+1)*vecx(simd-k+1)
    end do
end do
pval = sum(tmp)

end subroutine polyval

subroutine polyval2(p, x, y, pval, nx, ny)

implicit none

real*8, dimension(nx, ny), intent(in) :: p
real*8, intent(in) :: x, y
real*8, intent(out) :: pval
integer, intent(in) :: nx, ny

integer, parameter :: simd = 8
real*8 :: tmp(simd), vecx(simd), xfactor
integer :: i, j, k

! precompute factors
do i = 1, simd
end do
xfactor = x**simd

! horner
do i = 1, ny
    tmp = 0.0d0
    do j = 1, nx, simd
        ! inner vectorizable loop
        do k = 1, simd
            tmp(k) = tmp(k)*xfactor + p(nx-(j+k-1)+1,ny-i+1)*vecx(simd-k+1)
        end do
    end do
    pval = pval*y + sum(tmp)
end do

end subroutine polyval2

更新 2:正如所指出的,此代码不正确,至少在大小不能被simd. 它只是展示了手动帮助编译器的概念,所以不要像这样使用它。如果大小不是 2 的幂,则必须使用一个小的余数循环来处理悬空索引。做到这一点并不难,这里是单变量情况的正确程序,应该很容易将其扩展到双变量:

subroutine polyval(p, x, pval, nx)
implicit none

real*8, dimension(nx), intent(in) :: p
real*8, intent(in) :: x
real*8, intent(out) :: pval
integer, intent(in) :: nx

integer, parameter :: simd = 4
real*8 :: tmp(simd), vecx(simd), xfactor
integer :: i, j, k, nr

! precompute factors
do i = 1, simd
end do
xfactor = x**simd

! check remainder
nr = mod(nx, simd)

! horner
tmp = 0.0d0
do i = 1, nx-nr, simd
    do k = 1, simd
        tmp(k) = tmp(k)*xfactor + p(nx-(i+k-1)+1)*vecx(simd-k+1)
    end do
end do
pval = sum(tmp)

! do remainder
pval = pval * x**nr
do i = 1, nr
    pval = pval + p(i) * vecx(i)
end do
end subroutine polyval



此外,应该注意非常小的尺寸,因为时间太短而无法获得准确的性能配置文件。此外,相对时间numpy可能具有欺骗性,因为 numpy 的绝对时间可能非常糟糕。因此,以下是最大案例的时间安排:

对于 nx=2 20 的单变量,numpy 的时间为 1.21 秒,自定义 fortran 版本的时间为 1.69e-3 秒。对于 nx ny=2 20 的双变量,numpy 的时间为 8e-3 s,自定义版本的时间为 1.68e-3 s。当总 nx ny 大小相同时,单变量和双变量的时间相同这一事实非常重要,因为它支持代码在内存带宽限制附近执行的事实。

更新 3:使用较小尺寸的新 python 脚本,simd=4我得到以下性能:



更新 4:至于正确性,双精度精度内的结果是相同的,如果您为单变量示例运行此 python 代码,您可以看到:

import polynomial as P
import numpy.polynomial.polynomial as PP

import numpy as np

for n in xrange(2,100):
    poly1n = np.random.rand(n)
    poly1f = np.asfortranarray(poly1n)

    x = 2

    print "%18.14e" % P.polyval(poly1f, x)
    print "%18.14e" % PP.polyval(x, poly1n)
    print (P.polyval(poly1f, x) - PP.polyval(x, poly1n))/PP.polyval(x,poly1n), '\n'
于 2013-09-11T07:49:53.157 回答