8

我有一个从套接字读取并生成数据的线程。每次操作后,线程都会检查一个std::atomic_bool标志以查看它是否必须提前退出。

为了取消操作,我将取消标志设置为true,然后调用join()工作线程对象。

线程和取消函数的代码如下所示:

std::thread work_thread;
std::atomic_bool cancel_requested{false};

void thread_func()
{
   while(! cancel_requested.load(std::memory_order_relaxed))
      process_next_element();

}

void cancel()
{
    cancel_requested.store(true, std::memory_order_relaxed);
    work_thread.join();
}

std::memory_order_relaxed使用原子变量的内存顺序是否正确?

4

2 回答 2

6

只要cancel_requestedflag 和其他任何东西之间没有依赖关系,你就应该是安全的。

显示的代码看起来不错,假设cancel_requested仅用于加速关闭,但也有一个有序关闭的规定,例如队列中的哨兵条目(当然队列本身是同步的)。

这意味着您的代码实际上如下所示:

std::thread work_thread;
std::atomic_bool cancel_requested{false};
std::mutex work_queue_mutex;
std::condition_variable work_queue_filled_cond;
std::queue work_queue;

void thread_func()
{
    while(! cancel_requested.load(std::memory_order_relaxed))
    {
        std::unique_lock<std::mutex> lock(work_queue_mutex);
        work_queue_filled_cond.wait(lock, []{ return !work_queue.empty(); });
        auto element = work_queue.front();
        work_queue.pop();
        lock.unlock();
        if (element == exit_sentinel)
            break;
        process_next_element(element);
    }
}

void cancel()
{
    std::unique_lock<std::mutex> lock(work_queue_mutex);
    work_queue.push_back(exit_sentinel);
    work_queue_filled_cond.notify_one();
    lock.unlock();
    cancel_requested.store(true, std::memory_order_relaxed);
    work_thread.join();
}

如果我们到了那么远,那么cancel_requested还不如成为一个常规变量,代码甚至变得更简单。

std::thread work_thread;
bool cancel_requested = false;
std::mutex work_queue_mutex;
std::condition_variable work_queue_filled_cond;
std::queue work_queue;

void thread_func()
{
    while(true)
    {
        std::unique_lock<std::mutex> lock(work_queue_mutex);
        work_queue_filled_cond.wait(lock, []{ return cancel_requested || !work_queue.empty(); });
        if (cancel_requested)
            break;
        auto element = work_queue.front();
        work_queue.pop();
        lock.unlock();
        process_next_element(element);
    }
}

void cancel()
{
    std::unique_lock<std::mutex> lock(work_queue_mutex);
    cancel_requested = true;
    work_queue_filled_cond.notify_one();
    lock.unlock();
    work_thread.join();
}

memory_order_relaxed通常很难推理,因为它模糊了顺序执行代码的一般概念。因此,正如赫布在他的原子武器演讲中所解释的那样,它的用处非常非常有限。

Notestd::thread::join()本身充当两个线程之间的内存屏障。

于 2018-12-06T17:18:03.813 回答
4

这段代码是否正确取决于很多事情。最重要的是,这取决于您所说的“正确”到底是什么意思。据我所知,您显示的代码位不会调用未定义的行为(假设您的work_thread并且cancel_requested实际上并未按照您上面的代码段建议的顺序进行初始化,因为您将让线程可能读取原子的未初始化值)。如果您需要做的就是更改该标志的值并让线程最终在某个点看到新值,而与其他可能发生的事情无关,那么std::memory_order_relaxed就足够了。

但是,我看到您的工作线程调用了一个process_next_element()函数。这表明工作线程通过某种机制接收要处理的元素。处理完所有元素后,我看不到线程退出的任何方式。process_next_element()当没有可用的下一个元素时怎么办?它会立即返回吗?在这种情况下,您会忙于等待更多输入或取消,这将起作用,但可能并不理想。或者确实process_next_element()内部调用一些函数,该函数会阻塞直到元素可用!?如果是这种情况,那么取消线程必须首先设置取消标志,然后执行所需的任何操作以确保您的线程调用的下一个元素可能会阻塞返回。在这种情况下,线程在阻塞调用返回后永远看不到取消标志可能很重要。否则,您可能会返回调用,返回循环,仍然读取旧的取消标志,然后process_next_element()再次调用。如果process_next_element()保证只是再次返回,那么你很好。如果不是这种情况,那么您就会陷入僵局。所以我相信它在技术上取决于具体做什么process_next_element()可以想象一种实现process_next_element()您可能需要的不仅仅是放松的记忆顺序。但是,如果您已经有了获取要处理的新元素的机制,为什么还要使用单独的取消标志呢?您可以通过相同的机制简单地处理取消,例如,通过让它返回具有特殊值的下一个元素或根本不返回任何元素来表示处理的取消并导致线程返回而不是依赖单独的标志......

于 2018-12-06T15:49:27.170 回答