我对多线程编程相当陌生,所以请原谅我可能不精确的问题。这是我的问题:
我有一个函数处理数据并生成许多相同类型的对象。这是在几个嵌套循环中迭代完成的,因此只进行所有迭代,将这些对象保存在某个容器中,然后在该容器上处理接口代码以执行后续步骤是可行的。但是,我必须创建数百万个这样的对象,这会破坏内存使用。这些限制主要是由于我无法控制的外部因素。
只生成一定数量的数据是理想的,但打破循环并稍后在同一点重新启动也是不切实际的。我的想法是在一个单独的线程中进行处理,该线程将在 n 次迭代后暂停,并在所有 n 对象完全处理后恢复,然后恢复,进行 n 次下一次迭代,依此类推,直到所有迭代完成。等待线程完成所有 n 次迭代是很重要的,这样两个线程就不会真正并行运行。
这就是我的问题开始的地方:如何在这里正确地进行互斥锁?我的方法产生 boost::lock_errors。这是一些代码来显示我想要做什么:
boost::recursive_mutex bla;
boost::condition_variable_any v1;
boost::condition_variable_any v2;
boost::recursive_mutex::scoped_lock lock(bla);
int got_processed = 0;
const int n = 10;
void ProcessNIterations() {
got_processed = 0;
// have some mutex or whatever unlocked here so that the worker thread can
// start or resume.
// my idea: have some sort of mutex lock that unlocks here and a condition
// variable v1 that is notified while the thread is waiting for that.
lock.unlock();
v1.notify_one();
// while the thread is working to do the iterations this function should wait
// because there is no use to proceed until the n iterations are done
// my idea: have another condition v2 variable that we wait for here and lock
// afterwards so the thread is blocked/paused
while (got_processed < n) {
v2.wait(lock);
}
}
void WorkerThread() {
int counter = 0;
// wait for something to start
// my idea: acquire a mutex lock here that was locked elsewhere before and
// wait for ProcessNIterations() to unlock it so this can start
boost::recursive_mutex::scoped_lock internal_lock(bla);
for (;;) {
for (;;) {
// here do the iterations
counter++;
std::cout << "iteration #" << counter << std::endl;
got_processed++;
if (counter >= n) {
// we've done n iterations; pause here
// my idea: unlock the mutex, notify v2
internal_lock.unlock();
v2.notify_one();
while (got_processed > 0) {
// when ProcessNIterations() is called again, resume here
// my idea: wait for v1 reacquiring the mutex again
v1.wait(internal_lock);
}
counter = 0;
}
}
}
}
int main(int argc, char *argv[]) {
boost::thread mythread(WorkerThread);
ProcessNIterations();
ProcessNIterations();
while (true) {}
}
上面的代码在执行 10 次迭代后失败,v2.wait(lock);
并显示以下消息:
terminate called after throwing an instance of 'boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::lock_error> >'
what(): boost::lock_error
我该如何正确地做到这一点?如果这是要走的路,我该如何避免 lock_errors?
编辑:我使用这里讨论的并发队列解决了它。这个队列也有一个最大大小,之后 apush
将简单地等待,直到至少一个元素被pop
编辑。因此,生产者工作者可以简单地继续填充这个队列,其余代码可以pop
根据需要进入。不需要在队列外进行互斥锁。队列在这里:
template<typename Data>
class concurrent_queue
{
private:
std::queue<Data> the_queue;
mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable;
boost::condition_variable the_condition_variable_popped;
int max_size_;
public:
concurrent_queue(int max_size=-1) : max_size_(max_size) {}
void push(const Data& data) {
boost::mutex::scoped_lock lock(the_mutex);
while (max_size_ > 0 && the_queue.size() >= max_size_) {
the_condition_variable_popped.wait(lock);
}
the_queue.push(data);
lock.unlock();
the_condition_variable.notify_one();
}
bool empty() const {
boost::mutex::scoped_lock lock(the_mutex);
return the_queue.empty();
}
bool wait_and_pop(Data& popped_value) {
boost::mutex::scoped_lock lock(the_mutex);
bool locked = true;
if (the_queue.empty()) {
locked = the_condition_variable.timed_wait(lock, boost::posix_time::seconds(1));
}
if (locked && !the_queue.empty()) {
popped_value=the_queue.front();
the_queue.pop();
the_condition_variable_popped.notify_one();
return true;
} else {
return false;
}
}
int size() {
boost::mutex::scoped_lock lock(the_mutex);
return the_queue.size();
}
};