0

我想实现生产者-消费者场景,其中有一个生产者线程和多个消费者线程。让我具体说一下,生产者线程需要定期(例如,5 秒)创建一组对象,而消费者线程需要消耗这些对象。

我不确定如何定期创建一组对象以及如何同步多个消费者。

提前致谢

4

3 回答 3

2

你到底有什么问题?传统上,工作队列由条件锁保护;如果多个“消费者”正在等待同一个锁,那么他们中只有一个会成功获得它。但是,不要抓取整个队列,而应该只从其中抓取一个对象,将其从队列中删除,然后在处理作业之前释放锁。

至于定期创建作业,这是您使用的任何库中的计时器类的作业。如果您正在等待输入,则 select() 和 poll() 调用具有超时值;如果你什么都不做,你可以调用 [u]sleep()。

于 2012-08-31T14:52:27.787 回答
1

我不会详细说明所有代码,但您可以为此使用互斥锁和条件变量(以及链表)。基本模式是生产者执行以下操作:

loop_forever
    wait_until_interval_has_elapsed
    lock_the_mutex
        append_an_item_to_the_list
        signal_the_condvar // can be outside the mutex
    unlock_the_mutex

每个消费者都会:

loop_forever
    lock_the_mutex
        while(list_is_empty)
            wait_the_condvar
        remove_an_item_from_the_list
    unlock_the_mutex
    process_the_item

您也可以使用互斥锁和信号量来实现:

loop_forever
    wait_until_interval_has_elapsed
    lock_the_mutex
        append_an_item_to_the_list
    unlock_the_mutex
    post_the_semaphore

loop_forever
    wait_the_semaphore
    lock_the_mutex
        remove_an_item_from_the_list
    unlock_the_mutex
    process_the_item

就目前而言,这稍微简单一些,但是一旦您决定添加一种机制来告诉消费者完成他们正在做的事情并退出,您可能最好使用条件变量,因为您可以“广播”它。信号量因易于误用而享有盛誉,这就是 C++11 没有信号量的原因。不过,Posix 确实如此,所以在 linux 上您可以选择。公平地说,几乎所有其他多线程操作系统也是如此。

在线程具有不同优先级的情况下,mutex/condvar 对也可能比信号量提供更好的行为。与互斥锁不同,信号量没有“所有者”,因此优先级继承等技术是不可能的。

使用wait_until_interval_has_elapsed计时器或睡眠功能 - 请注意,除非您在实时系统上,否则您永远无法确定您将能够在特定时间运行,只能确定您会在特定时间或之后被唤醒.

于 2012-08-31T15:08:18.683 回答
1

您将需要一种方法来告诉消费者线程有可用的新对象。这是一个示例(我没有包括#includes):

#define NUM_CONSUMERS 2

static int objects[4]; //The place for produced objects
static pthread_mutex_t cond_mut = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void* producer_func(void* arg);
void* consumer_func(void* arg);

int main(int argc, char** argv) {
  pthread_t producer;
  pthread_t consumers[NUM_CONSUMERS];
  int res;
  void* zero = 0;
  res = pthread_create(&producer, NULL, producer_func, zero); 
  for(int i = 0; i < NUM_CONSUMERS; i++) {
    res = pthread_create(&consumers[i], NULL, consumer_func, zero);
  }
}

void* producer(void* arg) {
  while(1) {
    objects[0] = 0;
    objects[1] = 1;
    objects[2] = 2;
    objects[3] = 3;
    pthread_mutex_lock(&cond_mut);
    pthread_cond_signal(&cond);
    pthread_mutex_unlock(&cond_mut);
    sleep(5);
  }
}

void* consumer(void* arg) {
  while(1) {
    pthread_mutex_lock(&cond_mut);
    pthread_cond_wait(&cond);
    pthread_mutex_unlock(&cond_mut);
    //Process objects here 
  }
}
于 2012-08-31T15:10:12.577 回答