您可以将您的 lambda 包装在另一个 lambda 中,该 lambda 将负责计算“进行中”任务,然后在发布之前等待是否有太多正在进行的任务。
例子:
#include <atomic>
#include <chrono>
#include <future>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <boost/asio.hpp>
class ThreadPool {
using asio_worker = std::unique_ptr<boost::asio::io_service::work>;
boost::asio::io_service service;
asio_worker service_worker;
std::vector<std::thread> grp;
std::atomic<int> inProgress = 0;
std::mutex mtx;
std::condition_variable busy;
public:
ThreadPool(int threads) : service(), service_worker(new asio_worker::element_type(service)) {
for (int i = 0; i < threads; ++i) {
grp.emplace_back([this] { service.run(); });
}
}
template<typename F>
void enqueue(F && f) {
std::unique_lock<std::mutex> lock(mtx);
// limit queue depth = number of threads
while (inProgress >= grp.size()) {
busy.wait(lock);
}
inProgress++;
service.post([this, f = std::forward<F>(f)]{
try {
f();
}
catch (...) {
inProgress--;
busy.notify_one();
throw;
}
inProgress--;
busy.notify_one();
});
}
~ThreadPool() {
service_worker.reset();
for (auto& t : grp)
if (t.joinable())
t.join();
service.stop();
}
};
int main() {
std::unique_ptr<ThreadPool> pool(new ThreadPool(4));
for (int i = 1; i <= 20; ++i) {
pool->enqueue([i] {
std::string s("Hello from task ");
s += std::to_string(i) + "\n";
std::cout << s;
std::this_thread::sleep_for(std::chrono::seconds(1));
});
}
std::cout << "All tasks queued.\n";
pool.reset(); // wait for all tasks to complete
std::cout << "Done.\n";
}
输出:
Hello from task 3
Hello from task 4
Hello from task 2
Hello from task 1
Hello from task 5
Hello from task 7
Hello from task 6
Hello from task 8
Hello from task 9
Hello from task 10
Hello from task 11
Hello from task 12
Hello from task 13
Hello from task 14
Hello from task 15
Hello from task 16
Hello from task 17
Hello from task 18
All tasks queued.
Hello from task 19
Hello from task 20
Done.