1

我希望有可能让线程(消费者)在另一个线程(生产者)制作某些东西时表达兴趣。但并非总是如此。

基本上我想做一个一次性的消费者。理想情况下,生产者会愉快地处理其业务,直到一个(或多个)消费者表示他们想要某些东西,在这种情况下,生产者会将一些数据推送到变量中并表示它已经这样做了。消费者将等到变量被填充。

还必须是这样,一次性消费者可以决定它等待的时间太长并放弃等待(a la pthread_cond_timedwait

我一直在阅读许多关于同步线程的不同方法的文章和 SO 问题。目前我倾向于使用条件变量方法。

我想知道这是否是一个好方法(作为线程编程的新手,我可能有很多错误),或者在这种情况下(ab)使用信号量是否会更好?还是完全不同的东西?如果可用,只是对指针变量的原子分配?我目前看不到这些如何安全工作,可能是因为我试图保持安全,这个应用程序应该运行几个月,而不会锁定。我可以不使用生产者中的互斥锁吗?即:只是发出一个条件变量的信号?

我当前的代码如下所示:

consumer {
   pthread_mutex_lock(m);

   pred = true; /* signal interest */

   while (pred) {
       /* wait a bit and hopefully get an answer before timing out */
       pthread_cond_timedwait(c, m, t);

       /* it is possible that the producer never produces anything, in which
          case the pred will stay true, we must "designal" interest here,
          unfortunately the also means that a spurious wake could make us miss
          a good answer, no? How to combat this? */
       pred = false;
   }

   /* if we got here that means either an answer is available or we timed out */
   //... (do things with answer if not timed out, otherwise assign default answer)

   pthread_mutex_unlock(m);
}

/* this thread is always producing, but it doesn't always have listeners */
producer {
   pthread_mutex_lock(m);

   /* if we have a listener */
   if (pred) {
      buffer = "work!";

      pred = false;

      pthread_cond_signal(c);
   }

   pthread_mutex_unlock(m);
}

注意:我使用的是现代 linux,如有必要,可以使用特定于平台的功能 注意 2:我使用了看似全局的变量 m、c 和 t。但这些对于每个消费者来说都是不同的。

高层回顾

我希望一个线程能够注册一个事件,等待它指定的时间然后继续。理想情况下,应该可以同时注册多个线程,并且所有线程都应该获得相同的事件(在时间跨度内出现的所有事件)。

4

1 回答 1

1

你想要的是类似于std::futurec++ 中的 a (doc)。消费者请求生产者使用特定功能执行的任务。该函数创建一个名为future(或promise)的结构,其中包含一个互斥锁、一个与任务关联的条件变量以及一个用于结果的 void 指针,并将其返回给调用者。它还将该结构、任务 ID 和参数(如果有)放入由生产者处理的工作队列中。

struct future_s {
    pthread_mutex_t m;
    pthread_cond_t c;
    int flag;
    void *result;
};

// basic task outline
struct task_s {
    struct future_s result;
    int taskid;
};

// specific "mytask" task
struct mytask_s {
    struct future_s result;
    int taskid;
    int p1;
    float p2;
};

future_s *do_mytask(int p1, float p2){
     // allocate task data
     struct  mytask_s * t = alloc_task(sizeof(struct mytask_s));
     t->p1 = p1;
     t->p2 = p2;
     t->taskid = MYTASK_ID;
     task_queue_add(t);
    return (struct future_s *)t;
}

然后生产者将任务拉出队列,处理它,一旦终止,将结果放入未来并触发变量。

消费者可能会等待未来或做其他事情。

对于可取消的期货,在结构中包含一个标志以指示任务已取消。那么未来是:

  • 已交付,消费者是所有者,必须解除分配
  • 取消,生产者仍然是所有者并处置它。

因此,生产者必须在触发条件变量之前检查未来是否被取消。

对于“共享”的未来,旗帜变成了许多订阅者。如果数字大于零,则必须交付订单。拥有结果的消费者由所有消费者决定(先到先得?结果是否传递给所有消费者?)。

对未来结构的任何访问都必须是互斥的(这与条件变量配合得很好)。

关于队列,它们可以使用链表或数组来实现(对于容量有限的版本)。由于创建期货的函数可能会被同时调用,因此必须使用锁来保护它们,这通常使用互斥锁来实现。

于 2013-04-19T19:35:39.403 回答