2

问题: 我必须增加 x1 和 x2 变量,这应该由单独的线程完成,并且两个变量的下一个增量不应该被调用,直到两个变量的前一个增量都没有完成。

问题

建议的解决方案: 初始化 4 个信号量并调用单独的线程以单独增加变量。2个信号量用于将消息传递给线程以开始递增,2个信号量用于将消息传递给主线程以完成递增。主线程将等待来自两个子线程的信号量发布,显示两个变量的增量已完成,然后主线程将消息传递给两个子线程,允许进一步增量。

这个目前对我来说很好。但是,有人可以提出更好的解决方案吗?或者,任何人都可以指出这个解决方案中的问题吗? 任何帮助将不胜感激?提前致谢。

解决方案代码:

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>

//Threads
pthread_t pth1,pth2;

//Values to calculate
int x1 = 0, x2 = 0;

sem_t c1,c2,c3,c4;

void *threadfunc1(void *parm)
{
    for (;;) {
        x1++;
        sem_post(&c1);
        sem_wait(&c3);
    }
    return NULL ;
}

void *threadfunc2(void *parm)
{
    for (;;) {
        x2++;
        sem_post(&c2);
        sem_wait(&c4);
    }
    return NULL ;
}

int main () {
    sem_init(&c1, 0, 0);
    sem_init(&c2, 0, 0);
    sem_init(&c3, 0, 0);
    sem_init(&c4, 0, 0);
    pthread_create(&pth1, NULL, threadfunc1, "foo");
    pthread_create(&pth2, NULL, threadfunc2, "foo");
    sem_wait(&c1);
    sem_wait(&c2);
    sem_post(&c3);
    sem_post(&c4);
    int loop = 0;
    while (loop < 8) {
        // iterated as a step
        loop++;
        printf("Initial   : x1 = %d, x2 = %d\n", x1, x2);
        sem_wait(&c1);
        sem_wait(&c2);
        printf("Final   : x1 = %d, x2 = %d\n", x1, x2);
        sem_post(&c3);
        sem_post(&c4);
    }
    sem_wait(&c1);
    sem_wait(&c2);
    sem_destroy(&c1);
    sem_destroy(&c2);
    sem_destroy(&c3);
    sem_destroy(&c4);
    printf("Result   : x1 = %d, x2 = %d\n", x1, x2);
    pthread_cancel(pth1);
    pthread_cancel(pth2);
    return 1;
}
4

2 回答 2

1

Instead of having a bunch of threads to do x1 things, pausing them, then having a bunch of threads do x2 things, consider a threadpool. A threadpool is a bunch of threads which sit idle until you have work for them to do, then they unpause and do the work.

An advantage of this system is that it uses condition variables and mutexes rather than semaphores. On many systems, mutexes are faster that semaphores (because they are more limited).

// a task is an abstract class describing "something that can be done" which
// can be put in a work queue
class Task
{
    public:
        virtual void run() = 0;
};

// this could be made more Object Oriented if desired... this is just an example.
// a work queue 
struct WorkQueue
{
    std::vector<Task*>  queue; // you must hold the mutex to access the queue
    bool                finished; // if this is set to true, threadpoolRun starts exiting
    pthread_mutex_t     mutex;
    pthread_cond_t      hasWork; // this condition is signaled if there may be more to do
    pthread_cond_t      doneWithWork; // this condition is signaled if the work queue may be empty
};

void threadpoolRun(void* queuePtr)
{
    // the argument to threadpoolRun is always a WorkQueue*
    WorkQueue& workQueue= *dynamic_cast<WorkQueue*>(queuePtr);
    pthread_mutex_lock(&workQueue.mutex);

    // precondition: every time we start this while loop, we have to have the
    // mutex.
    while (!workQueue.finished) {
        // try to get work.  If there is none, we wait until someone signals hasWork
        if (workQueue.queue.empty()) {
            // empty.  Wait until another thread signals that there may be work
            // but before we do, signal the main thread that the queue may be empty
            pthread_cond_broadcast(&workQueue.doneWithWOrk);
            pthread_cond_wait(&workQueue.hasWork, &workQueue.mutex);
        } else {
            // there is work to be done.  Grab the task, release the mutex (so that
            // other threads can get things from the work queue), and start working!
            Task* myTask = workQueue.queue.back();
            workQueue.queue.pop_back(); // no one else should start this task
            pthread_mutex_unlock(&workQueue.mutex);

            // now that other threads can look at the queue, take our time
            // and complete the task.
            myTask->run();

            // re-acquire the mutex, so that we have it at the top of the while
            // loop (where we need it to check workQueue.finished)
            pthread_mutex_lock(&workQueue.mutex);
        }
    }
}

// Now we can define a bunch of tasks to do your particular problem
class Task_x1a
: public Task
{
    public:
        Task_x1a(int* inData)
        : mData(inData)
        { }

        virtual void run()
        {
            // do some calculations on mData
        }
    private:
        int*  mData;
};

class Task_x1b
: public Task
{ ... }

class Task_x1c
: public Task
{ ... }

class Task_x1d
: public Task
{ ... }

class Task_x2a
: public Task
{ ... }

class Task_x2b
: public Task
{ ... }

class Task_x2c
: public Task
{ ... }

class Task_x2d
: public Task
{ ... }

int main()
{
    // bet you thought you'd never get here!
    static const int numberOfWorkers = 4; // this tends to be either the number of CPUs
                                          // or CPUs * 2
    WorkQueue workQueue; // create the workQueue shared by all threads
    pthread_mutex_create(&workQueue.mutex);
    pthread_cond_create(&workQueue.hasWork);
    pthread_cond_create(&workQueue.doneWithWork);
    pthread_t workers[numberOfWorkers];
    int data[10];

    for (int i = 0; i < numberOfWorkers; i++)
        pthread_create(&pth1, NULL, &threadpoolRun, &workQueue);

    // now all of the workers are sitting idle, ready to do work
    // give them the X1 tasks to do
    {
        Task_x1a    x1a(data);
        Task_x1b    x1b(data);
        Task_x1c    x1c(data);
        Task_x1d    x1d(data);

        pthread_mutex_lock(&workQueue.mutex);
        workQueue.queue.push_back(x1a);
        workQueue.queue.push_back(x1b);
        workQueue.queue.push_back(x1c);
        workQueue.queue.push_back(x1d);

        // now that we've queued up a bunch of work, we have to signal the
        // workers that the work is available
        pthread_cond_broadcast(&workQueue.hasWork);

        // and now we wait until the workers finish
        while(!workQueue.queue.empty())
            pthread_cond_wait(&workQueue.doneWithWork);
        pthread_mutex_unlock(&workQueue.mutex);
    }
    {
        Task_x2a    x2a(data);
        Task_x2b    x2b(data);
        Task_x2c    x2c(data);
        Task_x2d    x2d(data);

        pthread_mutex_lock(&workQueue.mutex);
        workQueue.queue.push_back(x2a);
        workQueue.queue.push_back(x2b);
        workQueue.queue.push_back(x2c);
        workQueue.queue.push_back(x2d);

        // now that we've queued up a bunch of work, we have to signal the
        // workers that the work is available
        pthread_cond_broadcast(&workQueue.hasWork);

        // and now we wait until the workers finish
        while(!workQueue.queue.empty())
            pthread_cond_wait(&workQueue.doneWithWork);
        pthread_mutex_unlock(&workQueue.mutex);
    }

    // at the end of all of the work, we want to signal the workers that they should
    // stop.  We do so by setting workQueue.finish to true, then signalling them
    pthread_mutex_lock(&workQueue.mutex);
    workQueue.finished = true;
    pthread_cond_broadcast(&workQueue.hasWork);
    pthread_mutex_unlock(&workQueue.mutex);

    pthread_mutex_destroy(&workQueue.mutex);
    pthread_cond_destroy(&workQueue.hasWork);
    pthread_cond_destroy(&workQueue.doneWithWork);
    return data[0];
}

Major notes:

  • If you have more tasks than CPUs, making extra threads is just more bookkeeping for the CPU. The threadpool accepts any number of tasks, and then works on them with the most efficient number of CPUs possible.
  • If there is way more work than CPUs (like 4 CPUs and 1000 tasks), then this system is very very efficient. Mutex lock/unlock is the cheapest thread synchronization you will get short of a lockfree queue (which is probably way more work than it is worth). If you have a bunch of tasks, it will just grab them one at a time.
  • If your tasks are terribly tiny (like your increment example above), you can easily modify the threadPool to grab multiple Tasks at once, then work them serially.
于 2013-09-12T07:34:07.393 回答
0

您的程序的问题是您正在同步线程以彼此同步运行。在每个线程中,在每次迭代中,都会增加一个计数器,然后调用两个同步原语。因此,循环体中一半以上的时间都花在了同步上。

在您的程序中,计数器实际上彼此无关,因此它们确实应该彼此独立运行,这意味着每个线程实际上可以在其迭代期间进行实际计算,而不是主要同步。

对于输出要求,您可以允许每个线程将每个子计算放入主线程可以读取的数组中。主线程等待每个线程完全完成,然后可以从每个数组中读取以创建输出。

void *threadfunc1(void *parm)
{
    int *output = static_cast<int *>(parm);
    for (int i = 0; i < 10; ++i) {
        x1++;
        output[i] = x1;
    }
    return NULL ;
}

void *threadfunc2(void *parm)
{
    int *output = static_cast<int *>(parm);
    for (int i = 0; i < 10; ++i) {
        x2++;
        output[i] = x2;
    }
    return NULL ;
}

int main () {
    int out1[10];
    int out2[10];
    pthread_create(&pth1, NULL, threadfunc1, out1);
    pthread_create(&pth2, NULL, threadfunc2, out2);
    pthread_join(pth1, NULL);
    pthread_join(pth2, NULL);
    int loop = 0;
    while (loop < 9) {
        // iterated as a step
        loop++;
        printf("Final   : x1 = %d, x2 = %d\n", out1[loop], out2[loop]);
    }
    printf("Result   : x1 = %d, x2 = %d\n", out1[9], out2[9]);
    return 1;
}
于 2013-09-03T06:42:48.523 回答