-1

我正在做一个项目,我需要从文件中加载和处理千兆字节的序列。由于我正在处理大量数据,因此我无法将其保存在 RAM 上。所以我使用一个线程从这个文件加载数据并保存在一个队列中,一个线程一旦检测到队列上有东西,就将其卸载到某个临时文件中。

我在做这件事时遇到了一些麻烦。看起来有一个赛车条件。有时它有效,有时它返回分段错误。

我用这个错误写了一个最小的代码示例。

这是我的队列代码

//####################
// STRUCTS 
//####################
struct queue_item{
    char *seq;
    struct queue_item *prox;//Next element
};
typedef struct queue_item QueueItem;

struct Queue{
    QueueItem *first;//First element on queue
    QueueItem *end;//Last element on queue
    int size;//Queue size
};
typedef struct Queue Queue;

//####################

Queue* create_queue(){
    Queue *f;
    f = (Queue*)malloc(sizeof(Queue));
    f->size = 0;
    return f;
}

QueueItem* new_queue_item(char *seq){
    QueueItem* new;
    int n;

    n = strlen(seq);
    new = (QueueItem*)malloc(sizeof(QueueItem));
    new->seq = (char*)malloc((n+1)*sizeof(char));

    strcpy(new->seq,seq);

    return new;
}

void enqueue(Queue *f,char *seq){
    QueueItem* new;
    new = new_queue_item(seq);

    switch(f->size){
        case 0:
            f->first = new;
        break;
        case 1:
            f->end = new;
            f->first->prox = f->end;
        break;
        default:
            f->end->prox = new;
            f->end = new;
        break;
    }

    f->size = f->size + 1;
    return;
}

char* dequeue(Queue *f){
    QueueItem *hold;
    char *seq;

    if(f->size > 0){
        hold = f->first;
        seq = f->first->seq;
        f->first = f->first->prox;
        free(hold);
        f->size--;
    }

    return seq;
}

int queue_size(Queue *f){
    return f->size;
}

void seq_to_file(char *seq,FILE *f){
    if(seq != NULL){
        fputs(seq,f);
        free(seq);
    }
    return;
}

这是我的主要代码

Queue *f;
int i;
int end_flag;
char *seq;

f = create_queue();
end_flag = 0;

#pragma omp parallel shared(f) shared(end_flag)
{
    #pragma omp sections
    {
        #pragma omp section
        {
            FILE *tmp;
            tmp = fopen("tmp","w");
            while(!end_flag){
                if(queue_size(f) > 0)
                    seq_to_file(dequeue(f),tmp);
            }

            fclose(tmp);    
        }
        #pragma omp section
        {
            seq = (char*)malloc(21*sizeof(char));
            strcpy(seq,"ABCDEFGHIJKLMNOPQRST");

            for(i=0;i < NSEQS;i++)
                enqueue(f,seq);
            end_flag = 1;
        }
    }
}       

我检测到的一些错误:

1 - new_queue_item() 行上的 malloc 错误:new->seq = (char*)malloc((n+1)*sizeof(char));

* 检测到 glibc * /home/pedro/Dropbox/Programação/C/Queue/fila_teste:双重释放或损坏(输出):0x00000000006f3bd0 * 检测到 glibc/home/pedro/Dropbox/Programação/C/Queue/fila_teste:malloc():内存损坏(快速):0x00000000006f3b70 *

2 - new_queu_item() 行上的 malloc 错误:new = (QueueItem*)malloc(sizeof(QueueItem));

3 - seq_to_file() 行上的免费错误:free(seq);

* 检测到 glibc * /home/pedro/Dropbox/Programação/C/Queue/fila_teste:双重释放或损坏(输出):0x0000000000cdd3f0 *

检查 gdb 我有: (gdb) print *f $16 = {first = 0x0, end = 0x611180, size = 426}

这第三个错误让我认为这确实是一种竞争情况。

我试图用“end_flag”模拟一个信号量,但认为这还不够。另外,不要认为“关键”和“原子”子句在这里会有所帮助,因为它们只保护对代码区域的访问,而不是内存。

知道如何解决这个问题吗?

4

1 回答 1

1

如果您不打算重用此代码,则可以使用以下代码:

#pragma omp critical(queueLock)

在上面的示例中,该指令将充当“命名”互斥锁“queueLock”。在您的情况下,您必须在 enqueue、dequeue 和 queue_size 函数中使用它,因为它们都使用共享数据。

如果您打算重用此代码,您应该了解 OpenMP 锁 ( omp_lock_t)。

于 2012-09-21T22:16:55.987 回答