7

我想实现一种方法来安排稍后执行的任务。该接口类似于 JavaScript 的setTimeout(function, milliseconds).

在我的应用程序中,某些资源由线程拥有。为了避免竞争条件,它们必须始终从同一个线程访问。如果其他线程想要访问资源,它们必须将任务对象分派给资源线程。

所以我需要解决的两个问题是:

  1. 将任务分派给线程
  2. 延迟调用

第一个问题通过使用无锁队列快速解决,该队列在消费端具有资源线程。(我使用 TBB 的 concurrent_bounded_queue。)然而,第二个问题对我来说并不那么明显。我可以想到两种策略:

  1. 为每个任务启动一个新线程。该线程将休眠所需的延迟,然后将任务分派到并发队列。
  2. 仅启动一个线程,该线程运行一个循环,该循环迭代计划任务并在它们的等待时间到期时调用它们。

我对这两种方法都进行了试验,我倾向于第一种,因为它简单可靠,而第二种更容易出现细微的错误。第一种方法将此委托给 OS 线程调度程序。

但是,第一个解决方案确实创建了很多短期线程,而我通常会听到重用线程的建议。

4

1 回答 1

3

手动实施将如下所示。

struct myrunnable {
  uint64_t id_;
  uint64_t stamp_;
  std::function<void()> runnable_;
  uint64_t id() { return id_; }
  uint64_t stamp() { return stamp_; }
  void execute() { if (runnable_) runnable_(); }
};

typedef std::shared_ptr<myrunnable> task_t;
// timestamp_cmp_t - a comparator by timestamp + incrementing task id
typedef tbb::concurrent_blocking_queue<task_t> queue_t;
typedef std::priority_queue<task, timestamp_cmp_t> schedule_t;

uint64_t now(); // a wrapper around gettimeofday(), write yourself

queue_t queue; // inbound concurrent blocking queue not bound in size
schedule_t schedule; // priority queue, a scheduler
// queue_t sink; // optional sink concurrent queue if you don't
                 // want to execute tasks in the scheduler thread context

// now() - a wrapper around gettimeofday(), write yourself
for(;;) { // "termination mark" comments below - exit points
  while (!schedule.empty() && schedule.top().stamp() <= now()) {
    task_t task = schedule.pop();
    task .execute();
    // alternatively sink.push(task) to offload scheduler thread
  }

  if (schedule.empty()) {
    task_t task = queue.pop(); // block on the input queue
    if (!task) return; // scheduler termination mark, empty task
    schedule.push(task);
  } else {
    // Here we are driven by our latency/cpu balance requirements
    // in this example we are ultra low latency and are just spinning CPU
    // and on Linux such thread may need extra tuning to perform consistently.
    // To pace it down one can use TBB's sleep_for() or select() system call

    while (schedule.top().stamp() > now()) {
      task_t task;
      if (queue.try_pop(task)) {
        if (!task) return; // scheduler termination mark, empty task
        schedule.push(task);
      }
    }
  }
}
于 2013-07-07T09:51:30.333 回答