我已经编写了一个用于保存整数的同步队列,并且面临着一个我似乎无法理解的奇怪的竞争条件。
请不要发布解决方案,我知道如何修复代码并使其工作,我想知道竞态条件是什么以及为什么它没有按预期工作。请帮助我了解出了什么问题以及原因。
首先是代码的重要部分:
这假设应用程序永远不会超过缓冲区可以容纳的内容,因此不检查当前缓冲区大小
static inline void int_queue_put_sync(struct int_queue_s * const __restrict int_queue, const long int value ) {
if (value) { // 0 values are not allowed to be put in
size_t write_offset; // holds a current copy of the array index where to put the element
for (;;) {
// retrieve up to date write_offset copy and apply power-of-two modulus
write_offset = int_queue->write_offset & int_queue->modulus;
// if that cell currently holds 0 (thus is empty)
if (!int_queue->int_container[write_offset])
// Appetmt to compare and swap the new value in
if (__sync_bool_compare_and_swap(&(int_queue->int_container[write_offset]), (long int)0, value))
// if successful then this thread was the first do do this, terminate the loop, else try again
break;
}
// increment write offset signaling other threads where the next free cell is
int_queue->write_offset++;
// doing a synchronised increment here does not fix the race condition
}
}
这似乎有一个罕见的竞争条件,似乎不会增加write_offset
. 在 RedHat 2.6.32 Intel(R) Xeon(R) 上的 OS X gcc 4.2、Intel Core i5 四核和 Linux Intel C Compiler 12 上测试。两者都会产生竞争条件。
带有测试用例的完整源代码:
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <stdint.h>
// #include "int_queue.h"
#include <stddef.h>
#include <string.h>
#include <unistd.h>
#include <sys/mman.h>
#ifndef INT_QUEUE_H
#define INT_QUEUE_H
#ifndef MAP_ANONYMOUS
#define MAP_ANONYMOUS MAP_ANON
#endif
struct int_queue_s {
size_t size;
size_t modulus;
volatile size_t read_offset;
volatile size_t write_offset;
volatile long int int_container[0];
};
static inline void int_queue_put(struct int_queue_s * const __restrict int_queue, const long int value ) {
if (value) {
int_queue->int_container[int_queue->write_offset & int_queue->modulus] = value;
int_queue->write_offset++;
}
}
static inline void int_queue_put_sync(struct int_queue_s * const __restrict int_queue, const long int value ) {
if (value) {
size_t write_offset;
for (;;) {
write_offset = int_queue->write_offset & int_queue->modulus;
if (!int_queue->int_container[write_offset])
if (__sync_bool_compare_and_swap(&(int_queue->int_container[write_offset]), (long int)0, value))
break;
}
int_queue->write_offset++;
}
}
static inline long int int_queue_get(struct int_queue_s * const __restrict int_queue) {
size_t read_offset = int_queue->read_offset & int_queue->modulus;
if (int_queue->write_offset != int_queue->read_offset) {
const long int value = int_queue->int_container[read_offset];
int_queue->int_container[read_offset] = 0;
int_queue->read_offset++;
return value;
} else
return 0;
}
static inline long int int_queue_get_sync(struct int_queue_s * const __restrict int_queue) {
size_t read_offset;
long int volatile value;
for (;;) {
read_offset = int_queue->read_offset;
if (int_queue->write_offset == read_offset)
return 0;
read_offset &= int_queue->modulus;
value = int_queue->int_container[read_offset];
if (value)
if (__sync_bool_compare_and_swap(&(int_queue->int_container[read_offset]), (long int)value, (long int)0))
break;
}
int_queue->read_offset++;
return value;
}
static inline struct int_queue_s * int_queue_create(size_t num_values) {
struct int_queue_s * int_queue;
size_t modulus;
size_t temp = num_values + 1;
do {
modulus = temp;
temp--;
temp &= modulus;
} while (temp);
modulus <<= 1;
size_t int_queue_mem = sizeof(*int_queue) + ( sizeof(int_queue->int_container[0]) * modulus);
if (int_queue_mem % sysconf(_SC_PAGE_SIZE)) int_queue_mem += sysconf(_SC_PAGE_SIZE) - (int_queue_mem % sysconf(_SC_PAGE_SIZE));
int_queue = mmap(NULL, int_queue_mem, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE , -1, 0);
if (int_queue == MAP_FAILED)
return NULL;
int_queue->modulus = modulus-1;
int_queue->read_offset = 0;
int_queue->write_offset = 0;
int_queue->size = num_values;
memset((void*)int_queue->int_container, 0, sizeof(int_queue->int_container[0]) * modulus);
size_t i;
for (i = 0; i < num_values; ) {
int_queue_put(int_queue, ++i );
}
return int_queue;
}
#endif
void * test_int_queue_thread(struct int_queue_s * int_queue) {
long int value;
size_t i;
for (i = 0; i < 10000000; i++) {
int waited = -1;
do {
value = int_queue_get_sync(int_queue);
waited++;
} while (!value);
if (waited > 0) {
printf("waited %d cycles to get a new value\n", waited);
// continue;
}
// else {
printf("thread %p got value %ld, i = %zu\n", (void *)pthread_self(), value, i);
// }
int timesleep = rand();
timesleep &= 0xFFF;
usleep(timesleep);
int_queue_put_sync(int_queue, value);
printf("thread %p put value %ld back, i = %zu\n", (void *)pthread_self(), value, i);
}
return NULL;
}
int main(int argc, char ** argv) {
struct int_queue_s * int_queue = int_queue_create(2);
if (!int_queue) {
fprintf(stderr, "error initializing int_queue\n");
return -1;
}
srand(0);
long int value[100];
size_t i;
for (i = 0; i < 100; i++) {
value[0] = int_queue_get(int_queue);
if (!value[0]) {
printf("error getting value\n");
}
else {
printf("got value %ld\n", value[0]);
}
int_queue_put(int_queue, value[0]);
printf("put value %ld back successfully\n", value[0]);
}
pthread_t threads[100];
for (i = 0; i < 4; i++) {
pthread_create(threads + i, NULL, (void * (*)(void *))test_int_queue_thread, int_queue);
}
for (i = 0; i < 4; i++) {
pthread_join(threads[i], NULL);
}
return 0;
}