1

我对多线程编程相当陌生,所以请原谅我可能不精确的问题。这是我的问题:

我有一个函数处理数据并生成许多相同类型的对象。这是在几个嵌套循环中迭代完成的,因此只进行所有迭代,将这些对象保存在某个容器中,然后在该容器上处理接口代码以执行后续步骤是可行的。但是,我必须创建数百万个这样的对象,这会破坏内存使用。这些限制主要是由于我无法控制的外部因素。

只生成一定数量的数据是理想的,但打破循环并稍后在同一点重新启动也是不切实际的。我的想法是在一个单独的线程中进行处理,该线程将在 n 次迭代后暂停,并在所有 n 对象完全处理后恢复,然后恢复,进行 n 次下一次迭代,依此类推,直到所有迭代完成。等待线程完成所有 n 次迭代是很重要的,这样两个线程就不会真正并行运行。

这就是我的问题开始的地方:如何在这里正确地进行互斥锁?我的方法产生 boost::lock_errors。这是一些代码来显示我想要做什么:

boost::recursive_mutex bla;
boost::condition_variable_any v1;
boost::condition_variable_any v2;
boost::recursive_mutex::scoped_lock lock(bla);
int got_processed = 0;
const int n = 10;

void ProcessNIterations() {
  got_processed = 0;
  // have some mutex or whatever unlocked here so that the worker thread can 
  // start or resume. 
  // my idea: have some sort of mutex lock that unlocks here and a condition
  // variable v1 that is notified while the thread is waiting for that.
  lock.unlock();
  v1.notify_one();

  // while the thread is working to do the iterations this function should wait
  // because there is no use to proceed until the n iterations are done
  // my idea: have another condition v2 variable that we wait for here and lock
  // afterwards so the thread is blocked/paused
  while (got_processed < n) {
    v2.wait(lock);
  }
}

void WorkerThread() {
  int counter = 0;
  // wait for something to start
  // my idea: acquire a mutex lock here that was locked elsewhere before and 
  // wait for ProcessNIterations() to unlock it so this can start
  boost::recursive_mutex::scoped_lock internal_lock(bla);

  for (;;) {
    for (;;) {
      // here do the iterations
      counter++;
      std::cout << "iteration #" << counter << std::endl;
      got_processed++;

      if (counter >= n) {
        // we've done n iterations; pause here
        // my idea: unlock the mutex, notify v2
        internal_lock.unlock();
        v2.notify_one();

        while (got_processed > 0) {
          // when ProcessNIterations() is called again, resume here
          // my idea: wait for v1 reacquiring the mutex again
          v1.wait(internal_lock);
        }
        counter = 0;
      }
    }
  }
}

int main(int argc, char *argv[]) {
  boost::thread mythread(WorkerThread);

  ProcessNIterations();
  ProcessNIterations();

  while (true) {}
}

上面的代码在执行 10 次迭代后失败,v2.wait(lock);并显示以下消息:

terminate called after throwing an instance of 'boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::lock_error> >'
  what():  boost::lock_error

我该如何正确地做到这一点?如果这是要走的路,我该如何避免 lock_errors?

编辑:我使用这里讨论的并发队列解决了它。这个队列也有一个最大大小,之后 apush将简单地等待,直到至少一个元素被pop编辑。因此,生产者工作者可以简单地继续填充这个队列,其余代码可以pop根据需要进入。不需要在队列外进行互斥锁。队列在这里:

template<typename Data>
class concurrent_queue
{
private:
  std::queue<Data> the_queue;
  mutable boost::mutex the_mutex;
  boost::condition_variable the_condition_variable;
  boost::condition_variable the_condition_variable_popped;
  int max_size_;
public:
  concurrent_queue(int max_size=-1) : max_size_(max_size) {}

  void push(const Data& data) {
    boost::mutex::scoped_lock lock(the_mutex);

    while (max_size_ > 0 && the_queue.size() >= max_size_) {
      the_condition_variable_popped.wait(lock);
    }

    the_queue.push(data);
    lock.unlock();
    the_condition_variable.notify_one();
  }

  bool empty() const {
    boost::mutex::scoped_lock lock(the_mutex);
    return the_queue.empty();
  }

  bool wait_and_pop(Data& popped_value) {
    boost::mutex::scoped_lock lock(the_mutex);
    bool locked = true;
    if (the_queue.empty()) {
      locked = the_condition_variable.timed_wait(lock, boost::posix_time::seconds(1));
    }

    if (locked && !the_queue.empty()) {
      popped_value=the_queue.front();
      the_queue.pop();
      the_condition_variable_popped.notify_one();
      return true;
    } else {
      return false;
    }
  }

  int size() {
    boost::mutex::scoped_lock lock(the_mutex);
    return the_queue.size();
  }
};
4

2 回答 2

1

这可以使用条件变量来实现。一旦你执行了 N 次迭代,就在条件变量上调用 wait(),当对象在另一个线程中处理时,在条件变量上调用 signal() 以解除阻塞在条件变量上的另一个线程。

于 2012-06-14T20:02:10.827 回答
0

您可能需要某种有限容量的队列列表或堆栈以及条件变量。当队列已满时,生产者线程等待条件变量,并且任何时候消费者线程从队列中删除一个元素,它都会向条件变量发出信号。这将允许生产者醒来并再次填充队列。如果您真的想一次处理 N 个元素,那么只有当队列中有 N 个元素的容量时才让工作人员发出信号,而不是每次他们从队列中拉出一个项目时发出信号。

于 2012-06-14T20:06:24.843 回答