看起来您正在尝试实现生产者和消费者, += 线程创建工作(要减少的数字)并且消费者将它们带走。
与其让消费者处于这样的微不足道的自旋循环中,不如看看条件变量。
std::queue<Job*> queue;
pthread_mutex mutex;
pthread_cond cond;
void AddJob(Job* job) {
pthread_mutex_lock(&mutex);
queue.push_back(job);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
void* QueueWorker(void* /*threadInfo*/) {
Job* job = NULL;
for (;;) {
pthread_mutex_lock(&mutex);
while ( queue.empty() ) {
// unlock the mutex until the cond is signal()d or broadcast() to.
// if this call succeeds, we will have the mutex locked again on the other side.
pthread_cond_wait(&cond, &mutex);
}
// take the first task and then release the lock.
job = queue.pop();
pthread_mutex_unlock(&mutex);
if ( job != NULL )
job->Execute();
}
return NULL;
}
这可以扩展到多个消费者。
顺便说一句,虽然熟悉 pthreads 实现可能很有用,但您可能应该查看可用的线程包装器之一。C++11 引入了 std::thread 和 std::mutex,许多人都对 boost 发誓,但我个人发现 OpenSceneGraph 团队的“OpenThreads”库是最容易使用和最优雅的库之一。
编辑:这是一个完整的工作实现,尽管有一些人为的机制来结束运行。
#include <queue>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
static int jobNo = 0;
class Job {
public:
Job() : m_i(++jobNo) { printf("Created job %d.\n", m_i); }
int m_i;
void Execute() { printf("Job %d executing.\n", m_i); usleep(500 * 1000); }
};
std::queue<Job*> queue;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
void AddJob(Job* job) {
pthread_mutex_lock(&mutex);
queue.push(job);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
void* QueueWorker(void* /*threadInfo*/) {
Job* job = NULL;
for (;;) {
pthread_mutex_lock(&mutex);
while ( queue.empty() ) {
// unlock the mutex until the cond is signal()d or broadcast() to.
// if this call succeeds, we will have the mutex locked again on the other side.
pthread_cond_wait(&cond, &mutex);
}
// take the first task and then release the lock.
job = queue.front();
queue.pop();
pthread_mutex_unlock(&mutex);
if ( job == NULL ) {
// in this demonstration, NULL ends the run, so forward to any other threads.
AddJob(NULL);
break;
}
job->Execute();
delete job;
}
return NULL;
}
int main(int argc, const char* argv[]) {
pthread_t worker1, worker2;
pthread_create(&worker1, NULL, &QueueWorker, NULL);
pthread_create(&worker2, NULL, &QueueWorker, NULL);
srand(time(NULL));
// queue 5 jobs with delays.
for ( size_t i = 0; i < 5; ++i ) {
long delay = (rand() % 800) * 1000;
printf("Producer sleeping %fs\n", (float)delay / (1000*1000));
usleep(delay);
Job* job = new Job();
AddJob(job);
}
// 5 more without delays.
for ( size_t i = 0; i < 5; ++i ) {
AddJob(new Job);
}
// null to end the run.
AddJob(NULL);
printf("Done with jobs.\n");
pthread_join(worker1, NULL);
pthread_join(worker2, NULL);
return 0;
}