我最近听说了新的 c++ 标准特性,它们是:
我无法弄清楚它们在哪些情况下适用且相互有用。
- 如果有人可以举一个例子来说明如何明智地使用它们,那将非常有帮助。
They're really aimed at quite different goals:
Barriers and latches are often used when you have a pool of worker threads that do some processing and a queue of work items that is shared between. It's not the only situation where they're used, but it is a very common one and does help illustrate the differences. Here's some example code that would set up some threads like this:
const size_t worker_count = 7; // or whatever
std::vector<std::thread> workers;
std::vector<Proc> procs(worker_count);
Queue<std::function<void(Proc&)>> queue;
for (size_t i = 0; i < worker_count; ++i) {
workers.push_back(std::thread(
[p = &procs[i], &queue]() {
while (auto fn = queue.pop_back()) {
fn(*p);
}
}
));
}
There are two types that I have assumed exist in that example:
Proc
: a type specific to your application that contains data and logic necessary to process work items. A reference to one is passed to each callback function that's run in the thread pool.Queue
: a thread-safe blocking queue. There is nothing like this in the C++ standard library (somewhat surprisingly) but there are a lot of open-source libraries containing them e.g. Folly MPMCQueue
or moodycamel::ConcurrentQueue
, or you can build a less fancy one yourself with std::mutex
, std::condition_variable
and std::deque
(there are many examples of how to do this if you Google for them).A latch is often used to wait until some work items you push onto the queue have all finished, typically so you can inspect the result.
std::vector<WorkItem> work = get_work();
std::latch latch(work.size());
for (WorkItem& work_item : work) {
queue.push_back([&work_item, &latch](Proc& proc) {
proc.do_work(work_item);
latch.count_down();
});
}
latch.wait();
// Inspect the completed work
How this works:
latch.count_down()
is called, effectively decrementing an internal counter that started at work.size()
.latch.wait()
returns and the producer thread knows that the work items have all been processed.Notes:
count_down()
method could be called zero times, one time, or multiple times on each thread, and that number could be different for different threads. For example, even if you push 7 messages onto 7 threads, it might be that all 7 items are processed onto the same one thread (rather than one for each thread) and that's fine.latch.wait()
won't be called until after all of the worker threads have already finished processing all of the work items. (This is the sort of odd condition you need to look out for when writing threaded code.) But that's OK, it's not a race condition: latch.wait()
will just immediately return in that case.queue
in this code. That's a perfectly valid strategy too, in fact if anything it's more common, but there are other situations where the latch is more useful.A barrier is often used to make all threads wait simultaneously so that the data associated with all of the threads can be operated on simultaneously.
typedef Fn std::function<void()>;
Fn completionFn = [&procs]() {
// Do something with the whole vector of Proc objects
};
auto barrier = std::make_shared<std::barrier<Fn>>(worker_count, completionFn);
auto workerFn = [barrier](Proc&) {
barrier->count_down_and_wait();
};
for (size_t i = 0; i < worker_count; ++i) {
queue.push_back(workerFn);
}
How this works:
workerFn
items off of the queue and call barrier.count_down_and_wait()
.completionFn()
while the others continue to wait.count_down_and_wait()
and be free to pop other, unrelated, work items from the queue.Notes:
workerFn
off of the queue and handle it. Once a thread has popped one off of the queue, it will wait in barrier.count_down_and_wait()
until all the other copies of workerFn
have been popped off by other threads, so there is no chance of it popping another one off.latch.wait()
). Here the producer thread doesn't wait for the barrier so we need to manage the memory in a different way.count_down_and_wait()
too, but you will obviously need to pass worker_count + 1
to the barrier's constructor. (And then you wouldn't need to use a shared pointer for the barrier.)!!! DANGER !!!
The last bullet point about other working being pushed onto the queue being "fine" is only the case if that other work doesn't also use a barrier! If you have two different producer threads putting work items with a barrier on to the same queue and those items are interleaved, then some threads will wait on one barrier and others on the other one, and neither will ever reach the required wait count - DEADLOCK. One way to avoid this is to only ever use barriers like this from a single thread, or even to only ever use one barrier in your whole program (this sounds extreme but is actually quite a common strategy, as barriers are often used for one-time initialisation on startup). Another option, if the thread queue you're using supports it, is to atomically push all work items for the barrier onto the queue at once so they're never interleaved with any other work items. (This won't work with the moodycamel
queue, which supports pushing multiple items at once but doesn't guarantee that they won't be interleved with items pushed on by other threads.)
At the point when you asked this question, the proposed experimental API didn't support completion functions. Even the current API at least allows not using them, so I thought I should show an example of how barriers can be used like that too.
auto barrier = std::make_shared<std::barrier<>>(worker_count);
auto workerMainFn = [&procs, barrier](Proc&) {
barrier->count_down_and_wait();
// Do something with the whole vector of Proc objects
barrier->count_down_and_wait();
};
auto workerOtherFn = [barrier](Proc&) {
barrier->count_down_and_wait(); // Wait for work to start
barrier->count_down_and_wait(); // Wait for work to finish
}
queue.push_back(std::move(workerMainFn));
for (size_t i = 0; i < worker_count - 1; ++i) {
queue.push_back(workerOtherFn);
}
How this works:
The key idea is to wait for the barrier twice in each thread, and do the work in between. The first waits have the same purpose as the previous example: they ensure any earlier work items in the queue are finished before starting this work. The second waits ensure that any later items in the queue don't start until this work has finished.
Notes:
The notes are mostly the same as the previous barrier example, but here are some differences:
count_down()
and then wait()
in place of count_down_and_wait()
. But using a barrier makes more sense, both because calling the combined function is a little simpler and because using a barrier communicates your intention better to future readers of the code.