1

对于我的项目使用,我需要在二维数组中存储一定数量(~100x100)的浮点数。在函数计算期间,我需要读取和写入数组,因为函数确实是瓶颈(消耗 98% 的时间),我真的需要它快速。

我用 numpy 和 cython 做了一些实验:

import numpy
import time
cimport numpy
cimport cython

cdef int col, row

DTYPE = numpy.int
ctypedef numpy.int_t DTYPE_t
cdef numpy.ndarray[DTYPE_t, ndim=2] matrix_c = numpy.zeros([100 + 1, 100 + 1], dtype=DTYPE)

time_ = time.time()
for l in xrange(5000):
    for col in xrange(100):
        for row in xrange(100):
            matrix_c[<unsigned int>row + 1][<unsigned int>col + 1] = matrix_c[<unsigned int>row][<unsigned int>col]
print "Numpy + cython time: {0}".format(time.time() - time_)

但我发现,尽管我做了所有尝试,使用 python 列表的版本仍然明显更快。

使用列表的代码:

matrix = []
for i in xrange(100 + 1):
    matrix.append([])
    for j in xrange(100 + 1):
        matrix[i].append(0)

time_ = time.time()
for l in xrange(5000):
    for col in xrange(100):
        for row in xrange(100):
            matrix[row + 1][col + 1] = matrix[row][col]
print "list time: {0}".format(time.time() - time_)

和结果:

list time: 0.0141758918762
Numpy + cython time: 0.484772920609

我做错了什么吗?如果没有,有什么可以帮助我改善结果的吗?

4

2 回答 2

2

这是我的代码版本。共有三个函数,分别处理整数数组、32位浮点数组和双精度浮点数组。

from numpy cimport ndarray as ar
cimport numpy as np
import numpy as np
cimport cython
import time

@cython.boundscheck(False)
@cython.wraparound(False)
def access_write_int(ar[int,ndim=2] c, int n):
    cdef int l, col, row, h=c.shape[0], w=c.shape[1]
    time_ = time.time()
    for l in range(n):
        for row in range(h-1):
            for col in range(w-1):
                c[row+1,col+1] = c[row,col]
    print "Numpy + cython time: {0}".format(time.time() - time_)

@cython.boundscheck(False)
@cython.wraparound(False)
def access_write_float(ar[np.float32_t,ndim=2] c, int n):
    cdef int l, col, row, h=c.shape[0], w=c.shape[1]
    time_ = time.time()
    for l in range(n):
        for row in range(h-1):
            for col in range(w-1):
                c[row+1,col+1] = c[row,col]
    print "Numpy + cython time: {0}".format(time.time() - time_)

@cython.boundscheck(False)
@cython.wraparound(False)
def access_write_double(ar[double,ndim=2] c, int n):
    cdef int l, col, row, h=c.shape[0], w=c.shape[1]
    time_ = time.time()
    for l in range(n):
        for row in range(h-1):
            for col in range(w-1):
                c[row+1,col+1] = c[row,col]
    print "Numpy + cython time: {0}".format(time.time() - time_)

要从 Python 调用这些函数,我运行这个

import numpy as np
from numpy.random import rand, randint

print "integers"
c = randint(0, high=20, size=(101,101))
access_write_int(c, 5000)
print "32 bit float"
c = rand(101, 101).astype(np.float32)
access_write_float(c, 5000)
print "double precision"
c = rand(101, 101)
access_write_double(c, 5000)

以下更改很重要:

  1. [i,j]通过使用表单的索引而不是访问它来避免对数组进行切片[i][j]

  2. 将变量lcolrow, 定义为整数,以便 for 循环在 C 中运行。

  3. 使用函数装饰器@cython.boundscheck(False)和“@cython.wraparound(False)”关闭程序关键部分的边界检查和环绕索引。这允许越界内存访问,因此只有在确定索引是它们应该是什么时才应该这样做。

  4. 交换两个最里面的for循环,以便根据数组在内存中的排列方式访问数组。这对于更大的数组有很大的不同。等给出的数组np.zeros np.random.rand通常是 C 连续的,因此行存储在连续的块中,并且沿着外部for循环中的行而不是内部循环改变索引会更快。如果要保持 for 循环保持原样,请考虑在对其运行函数之前对数组进行转置,以便将列改为连续块。

于 2013-08-08T15:05:23.387 回答
1

问题似乎是您访问矩阵元素的方式。

使用[i,j]而不是[i][j].

您还可以删除 cast <>,这可以防止采用错误的值,但会增加函数调用开销。

另外,我会在所有 Cython 中使用他们正在使用的文档中的示例range而不是因为。xrangerange

结果将类似于:

import numpy
import time
cimport numpy
cimport cython

cdef int col, row

INT = numpy.int
ctypedef numpy.int_t cINT
cdef numpy.ndarray[cINT, ndim=2] matrix_c = numpy.zeros([100 + 1, 100 + 1], dtype=INT)

time_ = time.time()
for l in range(5000):
    for col in range(100):
        for row in range(100):
            matrix_c[row + 1, col + 1] = matrix_c[row, col]
print "Numpy + cython time: {0}".format(time.time() - time_)

强烈推荐参考:

- 在 Cython 中使用 NumPy

于 2013-08-08T11:11:57.687 回答