-3

我已尝试实现本文所述的 C QuickSelect 算法(3 路快速排序(C 实现))。但是,我得到的只是性能比默认的 qsort 低 5 到 10 倍(即使是初始改组)。我试图挖掘此处提供的原始 qsort 源代码(https://github.com/lattera/glibc/blob/master/stdlib/qsort.c),但它太复杂了。有人有更简单,更好的算法吗?欢迎任何想法。谢谢,注意:我最初的问题是尝试将数组的第 K 个最小值获取到第一个 Kth 索引。所以我计划调用 quickselect K 次编辑 1:这是从上面的链接复制和改编的 Cython 代码

cdef void qswap(void* a, void* b, const size_t size) nogil:
    cdef char temp[size]# C99, use malloc otherwise
    #char serves as the type for "generic" byte arrays

    memcpy(temp, b,    size)
    memcpy(b,    a,    size)
    memcpy(a,    temp, size)

cdef void qshuffle(void* base, size_t num, size_t size) nogil: #implementation of Fisher
    cdef int i, j, tmp# create local variables to hold values for shuffle

    for i in range(num - 1, 0, -1): # for loop to shuffle
        j = c_rand() % (i + 1)#randomise j for shuffle with Fisher Yates
        qswap(base + i*size, base + j*size, size)

cdef void partition3(void* base,
                      size_t *low, size_t *high, size_t size,
                      QComparator compar) nogil:       
    # Modified median-of-three and pivot selection.                      
    cdef void *ptr = base
    cdef size_t lt = low[0]
    cdef size_t gt = high[0] # lt is the pivot
    cdef size_t i = lt + 1# (+1 !) we don't compare pivot with itself
    cdef int c = 0

    while (i <= gt):
        c = compar(ptr + i * size, ptr + lt * size)
        if (c < 0):# base[i] < base[lt] => swap(i++,lt++)
            qswap(ptr + lt * size, ptr + i * size, size)
            i += 1
            lt += 1
        elif (c > 0):#base[i] > base[gt] => swap(i, gt--)
            qswap(ptr + i * size, ptr + gt* size, size)
            gt -= 1
        else:#base[i] == base[gt]
            i += 1
    #base := [<<<<<lt=====gt>>>>>>]
    low[0] = lt                                          
    high[0] = gt 


cdef void qselectk3(void* base, size_t lo, size_t hi, 
   size_t size, size_t k, 
   QComparator compar) nogil:                                             
    cdef size_t low = lo                                          
    cdef size_t high = hi                                                      

    partition3(base, &low, &high,  size, compar)    

    if ((k - 1) < low): #k lies in the less-than-pivot partition           
        high = low - 1
        low = lo                      
    elif ((k - 1) >= low and  (k - 1) <= high): #k lies in the equals-to-pivot partition
        qswap(base, base + size*low, size)
        return                              
    else: # k > high => k lies in the greater-than-pivot partition                    
        low = high + 1
        high = hi 
    qselectk3(base, low, high, size, k, compar)

"""
 A selection algorithm to find the nth smallest elements in an unordered list. 
 these elements ARE placed at the nth positions of the input array                                                                         
"""
cdef void qselect(void* base, size_t num, size_t size,
                              size_t n,
                              QComparator compar) nogil:
    cdef int k
    qshuffle(base, num, size)
    for k in range(n):
        qselectk3(base + size*k, 0, num - k - 1, size, 1, compar)

我使用 python timeit 来获得方法 pyselect(N=50)和 pysort 的性能。像这样

def testPySelect():
    A = np.random.randint(16, size=(10000), dtype=np.int32)
    pyselect(A, 50)
timeit.timeit(testPySelect, number=1)

def testPySort():
    A = np.random.randint(16, size=(10000), dtype=np.int32)
    pysort(A)
timeit.timeit(testPySort, number=1)
4

2 回答 2

1

@chqrlie 的答案是好的和最终的答案,但为了完成这篇文章,我将发布 Cython 版本以及基准测试结果。简而言之,提出的解决方案比长向量上的 qsort 快 2 倍!

    cdef void qswap2(void *aptr, void *bptr, size_t size) nogil:
        cdef uint8_t* ac = <uint8_t*>aptr
        cdef uint8_t* bc = <uint8_t*>bptr
        cdef uint8_t t
        而(大小> 0):t = ac [0];ac[0] = bc[0];bc[0] = t; ac += 1; bc += 1; 大小 -= 1

    cdef 结构 qselect2_stack:
        uint8_t *base
        uint8_t *最后

    cdef void qselect2(void *base, size_t nmemb, size_t size,
                      size_t k,QComparator 比较)诺吉尔:
        cdef qselect2_stack 堆栈[64]
        cdef qselect2_stack *sp = &stack[0]

        cdef uint8_t *lb
        cdef uint8_t*ub
        cdef uint8_t *p
        cdef uint8_t *i
        cdef uint8_t *j
        cdef uint8_t *顶部

        如果(nmemb < 2 或大小 <= 0):
            返回

        顶部 = <uint8_t *> 基础
        如果(k < nmemb):
            顶部 += k*大小
        别的:
            顶部 += nmemb*size

        sp.base = <uint8_t *>base
        sp.last = <uint8_t *>base + (nmemb - 1) * 大小
        sp += 1

        cdef size_t 偏移量

        而(sp>堆栈):
            sp -= 1
            lb = sp.base
            ub = sp.last

            而(lb < ub 和 lb < 顶部):
                #选择中间元素作为枢轴并与第一个元素交换
                偏移量 = (ub - lb) >> 1
                p = lb + offset - offset % size
                qswap2(磅,p,尺寸)

                #分成两段
                我 = 磅 + 大小
                j = ub
                而1:
                    而(i < j 和比较(lb,i)> 0):
                        我 += 大小
                    而 (j >= i 和比较 (j, lb) > 0):
                        j -= 大小
                    如果 (i >= j):
                        休息
                    qswap2(i, j, 大小)
                    我 += 大小
                    j -= 大小

                # 将枢轴移动到它所属的位置
                qswap2(磅,j,尺寸)

                # 继续处理最小的段,最大的栈
                如果(j - 磅 <= ub - j):
                    sp.base = j + 大小
                    sp.last = ub
                    sp += 1
                    ub = j - 大小
                别的:
                    sp.base = 磅
                    sp.last = j - 大小
                    sp += 1
                    磅 = j + 大小

    cdef int int_comp(void* a, void* b) nogil:
        cdef int ai = (<int*>a)[0]
        cdef int bi = (<int*>b)[0]
        返回 (ai > bi ) - (ai < bi)

    def pyselect2(numpy.ndarray[int, ndim=1, mode="c"] na, int n):
        cdef int* a = <int*>&na[0]
        qselect2(a, len(na), sizeof(int), n, int_comp)

以下是基准测试结果(1,000 次测试):

#of 个元素 K #qsort (s) #qselect2 (s)
1,000 50 0.1261 0.0895
1,000 100 0.1261 0.0910

10,000 50 0.8113 0.4157
10,000 100 0.8113 0.4367
10,000 1,000 0.8113 0.4746

100,000 100 7.5428 3.8259
100,000 1,000 7,5428 3.8325
100,000 10,000 7,5428 4.5727

对于那些好奇的人来说,这段代码是使用神经网络进行表面重建领域的一颗明珠。再次感谢@chqrlie,您的代码在网络上是独一无二的。

于 2018-08-25T16:38:33.650 回答
0

这是一个用于您目的的快速实现:qsort_select是一个简单的实现,qsort自动修剪不必要的范围。

没有&& lb < top,它的行为就像常规一样,qsort除了更高级版本具有更好启发式的病理情况。此额外测试可防止对目标0 .. (k-1)之外的范围进行完整排序。该函数选择k最小值并对它们进行排序,数组的其余部分具有不确定顺序的剩余值。

#include <stdio.h>
#include <stdint.h>

static void exchange_bytes(uint8_t *ac, uint8_t *bc, size_t size) {
    while (size-- > 0) { uint8_t t = *ac; *ac++ = *bc; *bc++ = t; }
}

/* select and sort the k smallest elements from an array */
void qsort_select(void *base, size_t nmemb, size_t size,
                  int (*compar)(const void *a, const void *b), size_t k)
{
    struct { uint8_t *base, *last; } stack[64], *sp = stack;
    uint8_t *lb, *ub, *p, *i, *j, *top;

    if (nmemb < 2 || size <= 0)
        return;

    top = (uint8_t *)base + (k < nmemb ? k : nmemb) * size;
    sp->base = (uint8_t *)base;
    sp->last = (uint8_t *)base + (nmemb - 1) * size;
    sp++;
    while (sp > stack) {
        --sp;
        lb = sp->base;
        ub = sp->last;
        while (lb < ub && lb < top) {
            /* select middle element as pivot and exchange with 1st element */
            size_t offset = (ub - lb) >> 1;
            p = lb + offset - offset % size;
            exchange_bytes(lb, p, size);

            /* partition into two segments */
            for (i = lb + size, j = ub;; i += size, j -= size) {
                while (i < j && compar(lb, i) > 0)
                    i += size;
                while (j >= i && compar(j, lb) > 0)
                    j -= size;
                if (i >= j)
                    break;
                exchange_bytes(i, j, size);
            }
            /* move pivot where it belongs */
            exchange_bytes(lb, j, size);

            /* keep processing smallest segment, and stack largest */
            if (j - lb <= ub - j) {
                sp->base = j + size;
                sp->last = ub;
                sp++;
                ub = j - size;
            } else {
                sp->base = lb;
                sp->last = j - size;
                sp++;
                lb = j + size;
            }
        }
    }
}

int int_cmp(const void *a, const void *b) {
    int aa = *(const int *)a;
    int bb = *(const int *)b;
    return (aa > bb) - (aa < bb);
}

#define ARRAY_SIZE  50000

int array[ARRAY_SIZE];

int main(void) {
    int i;
    for (i = 0; i < ARRAY_SIZE; i++) {
        array[i] = ARRAY_SIZE - i;
    }
    qsort_select(array, ARRAY_SIZE, sizeof(*array), int_cmp, 50);
    for (i = 0; i < 50; i++) {
        printf("%d%c", array[i], i + 1 == 50 ? '\n' : ',');
    }
    return 0;
}
于 2018-08-25T13:55:02.030 回答