0

The problem is that when I run the code below, on a single core, sometimes it runs correctly,and sometimes I get segmentation fault. Probably this problem will occure more frequently on a multi-core machine. I need to know where this non-determinism is introduces in my program and how can I resolve it.thanks.

int numThreads = 4;

class Evaluator;

struct E {
    Evaluator* evaluator;
    int id;
};

class Evaluator {
public:
    pthread_t * threads;
    sem_t* fork_sync;
    sem_t* join_sync;
    int tin;
    pthread_mutex_t tin_mut;

    double * d;
    int sz;
    int cursor;
    pthread_mutex_t c_mut;

    Evaluator(sem_t* fs, sem_t* js) {
        fork_sync = fs;
        join_sync = js;
        threads = new pthread_t[numThreads];
        tin = 0;
        pthread_mutex_init(&tin_mut,NULL);
        for(int i=0 ;i<numThreads; i++) {
            E arg;
            arg.evaluator = this;
            arg.id = i;
            pthread_create(&threads[i],NULL,(void* (*) (void*) )func,(void*)&arg);
        }


        //dummy init
        sz = 20;
        d = new double[sz];
        for(int i=0; i<sz ; i++) d[i] = .5 + i;
        cursor = 0;
        pthread_mutex_init(&c_mut,NULL);
    }

    static void func(E* e) {        
        Evaluator* eval = e -> evaluator;
        eval -> go(e -> id);
    }

    void reset() {
        cursor = 0;
    }

    void go(int id) {
        while(1) {
            sem_wait(fork_sync);

            pthread_mutex_lock(&tin_mut);
            ++tin;
            pthread_mutex_unlock(&tin_mut);

            while(1) {
                int idx;
                pthread_mutex_lock(&c_mut);
                idx = cursor++;
                pthread_mutex_unlock(&c_mut);
                if(idx >= sz ) break;
                // do the evaluation
                cout << "evaluating  index " << idx << " using thread " << id << endl;
            }

            int remain;
            pthread_mutex_lock(&tin_mut);
            remain = --tin;
            pthread_mutex_unlock(&tin_mut);
            if(remain == 0) sem_post(join_sync);
        }
    }


};

int main(int argc, char *argv[]) {

    sem_t fork_sync;
    sem_t join_sync;

    sem_init(&fork_sync,0,0);
    sem_init(&join_sync,0,0);

    Evaluator e(&fork_sync,&join_sync);


    //evaluating t times
    int t = 3;
    for(int i=0; i<t; i++) {
        cout << "---------- evaluation number :" << i << endl;
        e.reset();
        for(int j=0; j<numThreads; j++) sem_post(&fork_sync);
        sem_wait(&join_sync);
        cout << endl;
    }

    return 0;
}
4

5 回答 5

2

arg is on the stack. You are taking its address and passing this address to another thread. Race condition (value on the stack can be overwritten before the newly created thread reads it).

E arg;
arg.evaluator = this;
arg.id = i;
pthread_create(&threads[i],NULL,(void* (*) (void*) )func,(void*)&arg);

Solution:

E* arg = new E();
arg->evaluator = this;
arg->id = i;
pthread_create(&threads[i],NULL,(void* (*) (void*) )func,(void*)arg);

And don't forget to delete e in func.

于 2009-12-22T08:15:14.997 回答
1

Navid, could you please provide an example that runs out of the box next time?

it doesn't hurt that much to add the following lines on top on your example

#include <pthread.h>
#include <semaphore.h>
#include <iostream>

using namespace std;

// compile with: g++ -g -pthread main.cpp -o main -lrt -lpthread

When I start the program in the debugger then indeed it crashes sometimes on the sem_wait() line (and sometimes it does not crash!)

void go(int id) {
    while(1) {
        sem_wait(fork_sync); //  <--- seems to crash here
        ...
于 2009-12-22T07:53:35.830 回答
1

the address of your object gets corrupted. This is caused by allocating the args element on the stack.When the threads start it may or may not contain valid values.
This is supports Vokuhila-Oliba's answer as the fork_sync is the first time the thread tries to access the objects memory.
Edit
The code works for me with the following changes (20 tests without a crash)

 
for(int i=0 ;i<numThreads; i++) {
                            E* arg = new E;
                            arg->evaluator = this;
                            arg->id = i;
                            pthread_create(&threads[i],NULL,func,arg);
                        }

static void* func(void* e) {
        Evaluator* eval = reinterpret_cast<E*>(e) -> evaluator;
        eval -> go(reinterpret_cast<E*>(e) -> id);
        delete(e);
        return NULL;
    }
于 2009-12-22T10:08:23.423 回答
1

Here is a quick fix to the second bug. This ensures that worker thread finishes before launching another iteration.

--- a/misc/so/sem_wait/q.cpp
+++ b/misc/so/sem_wait/q.cpp
@@ -83,7 +83,7 @@ public:
             pthread_mutex_lock(&tin_mut);
             remain = --tin;
             pthread_mutex_unlock(&tin_mut);
-            if(remain == 0) sem_post(join_sync);
+            sem_post(join_sync);
         }
     }

@@ -107,9 +107,11 @@ int main(int argc, char *argv[]) {
         cout << "---------- evaluation number :" << i << endl;
         e.reset();
         for(int j=0; j<numThreads; j++) sem_post(&fork_sync);
-        sem_wait(&join_sync);
+        for(int j=0; j<numThreads; j++) sem_wait(&join_sync);
         cout << endl;
     }

+    cout << "exit" << endl;
+
     return 0;
 }
于 2009-12-22T10:48:38.750 回答
0

Have you been able to reproduce the problem in debugger?

If you compiled optimized code; while the problem most likely is in your code, it might be worth trying with compiler optimizations off (-O0)

于 2009-12-22T07:13:17.523 回答