1

我已经编写了一个用于保存整数的同步队列,并且面临着一个我似乎无法理解的奇怪的竞争条件。

不要发布解决方案,我知道如何修复代码并使其工作,我想知道竞态条件是什么以及为什么它没有按预期工作。请帮助我了解出了什么问题以及原因。

首先是代码的重要部分:

这假设应用程序永远不会超过缓冲区可以容纳的内容,因此不检查当前缓冲区大小

static inline void int_queue_put_sync(struct int_queue_s * const __restrict int_queue, const long int value ) {
    if (value) { // 0 values are not allowed to be put in
        size_t write_offset; // holds a current copy of the array index where to put the element
        for (;;) {
            // retrieve up to date write_offset copy and apply power-of-two modulus
            write_offset = int_queue->write_offset & int_queue->modulus; 
            // if that cell currently holds 0 (thus is empty)
            if (!int_queue->int_container[write_offset])
                // Appetmt to compare and swap the new value in
                if (__sync_bool_compare_and_swap(&(int_queue->int_container[write_offset]), (long int)0, value))
                    // if successful then this thread was the first do do this, terminate the loop, else try again
                    break;
        }

        // increment write offset signaling other threads where the next free cell is
        int_queue->write_offset++;
        // doing a synchronised increment here does not fix the race condition
    }
}

这似乎有一个罕见的竞争条件,似乎不会增加write_offset. 在 RedHat 2.6.32 Intel(R) Xeon(R) 上的 OS X gcc 4.2、Intel Core i5 四核和 Linux Intel C Compiler 12 上测试。两者都会产生竞争条件。

带有测试用例的完整源代码:

#include <pthread.h>

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <stdint.h>

// #include "int_queue.h"
#include <stddef.h>
#include <string.h>
#include <unistd.h>
#include <sys/mman.h>

#ifndef INT_QUEUE_H
#define INT_QUEUE_H

#ifndef MAP_ANONYMOUS
#define MAP_ANONYMOUS MAP_ANON
#endif

struct int_queue_s {
    size_t size;
    size_t modulus;
    volatile size_t read_offset;
    volatile size_t write_offset;
    volatile long int int_container[0];
};

static inline void int_queue_put(struct int_queue_s * const __restrict int_queue, const long int value ) {
    if (value) {
        int_queue->int_container[int_queue->write_offset & int_queue->modulus] = value;
        int_queue->write_offset++;
    }
}

static inline void int_queue_put_sync(struct int_queue_s * const __restrict int_queue, const long int value ) {
    if (value) {
        size_t write_offset;
        for (;;) {
            write_offset = int_queue->write_offset & int_queue->modulus;
            if (!int_queue->int_container[write_offset])
                if (__sync_bool_compare_and_swap(&(int_queue->int_container[write_offset]), (long int)0, value))
                    break;
        }

        int_queue->write_offset++;
    }
}

static inline long int int_queue_get(struct int_queue_s * const __restrict int_queue) {
    size_t read_offset = int_queue->read_offset & int_queue->modulus;
    if (int_queue->write_offset != int_queue->read_offset) {
        const long int value = int_queue->int_container[read_offset];
        int_queue->int_container[read_offset] = 0;
        int_queue->read_offset++;
        return value;
    } else
        return 0;
}

static inline long int int_queue_get_sync(struct int_queue_s * const __restrict int_queue) {
    size_t read_offset;
    long int volatile value;
    for (;;) {

        read_offset = int_queue->read_offset;
        if (int_queue->write_offset == read_offset)
            return 0;
        read_offset &= int_queue->modulus;
        value = int_queue->int_container[read_offset];
        if (value)
            if (__sync_bool_compare_and_swap(&(int_queue->int_container[read_offset]), (long int)value, (long int)0))
                break;
    }
    int_queue->read_offset++;
    return value;
}

static inline struct int_queue_s * int_queue_create(size_t num_values) {

    struct int_queue_s * int_queue;
    size_t modulus;
    size_t temp = num_values + 1;
    do {
        modulus = temp;
        temp--;
        temp &= modulus;
    } while (temp);
    modulus <<= 1;

    size_t int_queue_mem = sizeof(*int_queue) + ( sizeof(int_queue->int_container[0]) * modulus);

    if (int_queue_mem % sysconf(_SC_PAGE_SIZE)) int_queue_mem += sysconf(_SC_PAGE_SIZE) - (int_queue_mem % sysconf(_SC_PAGE_SIZE));

    int_queue = mmap(NULL, int_queue_mem, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE , -1, 0);
    if (int_queue == MAP_FAILED)
        return NULL;

    int_queue->modulus = modulus-1;
    int_queue->read_offset = 0;
    int_queue->write_offset = 0;
    int_queue->size = num_values;

    memset((void*)int_queue->int_container, 0, sizeof(int_queue->int_container[0]) * modulus);

    size_t i;
    for (i = 0; i < num_values; ) {
        int_queue_put(int_queue, ++i );
    }

    return int_queue;
}


#endif


void * test_int_queue_thread(struct int_queue_s * int_queue) {
    long int value;

    size_t i;

    for (i = 0; i < 10000000; i++) {


        int waited = -1;
        do {
            value = int_queue_get_sync(int_queue);
            waited++;
        } while (!value);

        if (waited > 0) {
            printf("waited %d cycles to get a new value\n", waited);
            // continue;
        }

        // else {
        printf("thread %p got value %ld, i = %zu\n", (void *)pthread_self(), value, i);
        // }

        int timesleep = rand();
        timesleep &= 0xFFF;

        usleep(timesleep);

        int_queue_put_sync(int_queue, value);

        printf("thread %p put value %ld back, i = %zu\n", (void *)pthread_self(), value, i);
    }

    return NULL;
}


int main(int argc, char ** argv) {
    struct int_queue_s * int_queue = int_queue_create(2);

    if (!int_queue) {
        fprintf(stderr, "error initializing int_queue\n");
        return -1;
    }

    srand(0);

    long int value[100];

    size_t i;

    for (i = 0; i < 100; i++) {
        value[0] = int_queue_get(int_queue);

        if (!value[0]) {
            printf("error getting value\n");
        }
        else {
            printf("got value %ld\n", value[0]);
        }

        int_queue_put(int_queue, value[0]);

        printf("put value %ld back successfully\n", value[0]);
    }

    pthread_t threads[100];
    for (i = 0; i < 4; i++) {
        pthread_create(threads + i, NULL, (void * (*)(void *))test_int_queue_thread, int_queue);
    } 

    for (i = 0; i < 4; i++) {
        pthread_join(threads[i], NULL);
    } 


    return 0;
}
4

3 回答 3

5

有趣的问题。这是一个疯狂的猜测。:-)

看来您需要在 read_offset 和 write_offset 之间进行一些同步。

例如,这是一个可能相关或无关的种族。在您的比较和交换和 write_offset 增量之间,您可能有一个读者进来并将值设置回零。

Writer-1: get write_offset=0
Writer-2: get write_offset=0
Writer-1: compare-and-swap at offset=0
Writer-1: Set write_offset=1
Reader-1: compare-and-swap at offset=0 (sets it back to zero)
Writer-2: compare-and-swap at offset=0 again even though write_offset=1
Writer-2: Set write_offset=2
于 2012-11-26T19:20:56.200 回答
0

我相信这int_queue->write_offset++;就是问题所在:如果两个线程同时执行这条指令,它们都会从内存中加载相同的值,将其递增,然后将相同的结果存储回来(这样变量只会增加一)。

于 2012-11-26T18:52:44.097 回答
-1

我的意见是

int_queue->write_offset++;

write_offset = int_queue->write_offset & int_queue->modulus; 

不是线程安全的

于 2012-11-26T19:15:24.467 回答