3

我有一个由 2 个线程同时使用的类:一个线程将结果(一个接一个)添加到results任务中,第二个线程处理results已经存在的那些。

// all members are copy-able
struct task {
    command cmd;
    vector<result> results;
};

class generator {
    public:
        generator(executor* e); // store the ptr
        void run();
        ...
};

class executor {
    public:
        void run();
        void add_result(int command_id, result r);
        task& find_task(int command_id);
            ...
    private:
        vector<task> tasks_;
        condition_variable_any update_condition_;
};

发射

// In main, we have instances of generator and executor,
// we launch 2 threads and wait for them.
std::thread gen_th( std::bind( &generator::run, gen_instance_) );
std::thread exe_th( std::bind( &executor::run,  exe_instance_) );

生成器线程

void generator::run() {
    while(is_running) {
        sleep_for_random_seconds(); 
        executor_->add_result( SOME_ID, new_result() );
    }
}

执行线程

void executor::add_result( int command_id, result r ) {
    std::unique_lock<std::recursive_mutex> l(mutex_);
    task& t = this->find_task(command_id);
    t.results.push_back(r);
    update_condition_.notify_all();
}

void executor::run() { 
  while(is_running) {
     update_condition_.wait(...);
     task& t = this->find_task(SOME_ID);        
     for(result r: t.results) {
        // no live updates are visible here
     }
   }
}
  1. 生成器线程每隔几秒添加一个结果。
  2. 执行线程是一个executor本身。它通过run等待更新的方法运行,当更新发生时,它对结果起作用。

有几点需要注意:

  1. 任务向量可能很大;结果永远不会被处理;
  2. executor中的for-each循环获取它正在处理的任务,然后迭代结果,检查其中哪些是新的并处理它们。一旦处理,它们将被标记并且不会再次被处理。此处理可能需要一些时间。

Executor Thread在添加另一个结果之前没有完成 for 循环时,就会出现问题 - 结果对象在 for 循环中不可见。由于Executor Thread正在工作,它不会注意到更新条件更新,不会刷新向量等。当它完成时(在 的一个已经不是实际的视图上工作tasks_)它会再次挂在update_condition_刚刚触发的 .. .

我需要让代码知道,它应该在完成后再次运行循环,或者对循环中可见的任务进行更改for-each。这个问题的最佳解决方案是什么?

4

2 回答 2

1

您只需要在阻止 CV之前检查您的向量是否为空。像这样的东西:

while (running) {
    std::unique_lock<std::mutex> lock(mutex);
    while (tasks_.empty()) // <-- this is important
        update_condition_.wait(lock);
    // handle tasks_
}

如果您的架构允许(即,如果您在处理任务时不需要持有锁),您可能还希望在处理任务之前尽快解锁互斥锁,以便生产者可以推送更多任务而不会阻塞。也许tasks_用一个临时向量交换你的向量,然后解锁互斥锁,然后才开始处理临时向量中的任务:

while (running) {
    std::unique_lock<std::mutex> lock(mutex);
    while (tasks_.empty())
        update_condition_.wait(lock);
    std::vector<task> localTasks;
    localTasks.swap(tasks_);
    lock.unlock(); // <-- release the lock early
    // handle localTasks
}

编辑:啊,现在我意识到这并不适合您的情况,因为您的消息不是直接在,tasks_而是在tasks_.results. 虽然你明白我的一般想法,但使用它需要在你的代码中进行结构更改(例如,扁平化你的任务/结果,并且总是有一个与单个结果相关联的 cmd)。

于 2013-03-17T12:26:14.697 回答
0

我在相同的情况下采取以下方式

std::vector< ... > temp;
mutex.lock();
temp.swap( results );
mutex.unlock();
for(result r: temp ){
    ...
}

会有一点开销,但总的来说,整个代码更易读,如果计算量很大,那么复制的时间就会变为零(对不起,英语 - 它不是我原生的)))

于 2013-03-17T12:09:42.207 回答