11

boost::asio::io_service用作基本线程池。一些线程被添加到 io_service,主线程开始发布处理程序,工作线程开始运行处理程序,一切都结束了。到目前为止,一切都很好; 我在单线程代码上得到了很好的加速。

然而,主线程有数以百万计的东西要发布。它只是继续发布它们,比工作线程处理它们的速度要快得多。我没有达到 RAM 限制,但是将这么多东西排入队列仍然有点愚蠢。我想做的是为处理程序队列设置一个固定大小,如果队列已满,则使用 post() 块。

我在 Boost ASIO 文档中没有看到任何选项。这可能吗?

4

4 回答 4

2

我正在使用信号量来修复处理程序队列的大小。以下代码说明了此解决方案:

void Schedule(boost::function<void()> function)
{
    semaphore.wait();
    io_service.post(boost::bind(&TaskWrapper, function));
}

void TaskWrapper(boost::function<void()> &function)
{
    function();
    semaphore.post();
}
于 2012-08-19T00:17:35.687 回答
2

您可以将您的 lambda 包装在另一个 lambda 中,该 lambda 将负责计算“进行中”任务,然后在发布之前等待是否有太多正在进行的任务。

例子:

#include <atomic>
#include <chrono>
#include <future>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <boost/asio.hpp>

class ThreadPool {
  using asio_worker = std::unique_ptr<boost::asio::io_service::work>;
  boost::asio::io_service service;
  asio_worker service_worker;
  std::vector<std::thread> grp;
  std::atomic<int> inProgress = 0;
  std::mutex mtx;
  std::condition_variable busy;
public:
  ThreadPool(int threads) : service(), service_worker(new asio_worker::element_type(service)) {
    for (int i = 0; i < threads; ++i) {
      grp.emplace_back([this] { service.run(); });
    }
  }

  template<typename F>
  void enqueue(F && f) {
    std::unique_lock<std::mutex> lock(mtx);
    // limit queue depth = number of threads
    while (inProgress >= grp.size()) {
      busy.wait(lock);
    }
    inProgress++;
    service.post([this, f = std::forward<F>(f)]{
      try {
        f();
      }
      catch (...) {
        inProgress--;
        busy.notify_one();
        throw;
      }
      inProgress--;
      busy.notify_one();
    });
  }

  ~ThreadPool() {
    service_worker.reset();
    for (auto& t : grp)
      if (t.joinable())
        t.join();
    service.stop();
  }
};

int main() {
  std::unique_ptr<ThreadPool> pool(new ThreadPool(4));
  for (int i = 1; i <= 20; ++i) {
    pool->enqueue([i] {
      std::string s("Hello from task ");
      s += std::to_string(i) + "\n";
      std::cout << s;
      std::this_thread::sleep_for(std::chrono::seconds(1));
    });
  }
  std::cout << "All tasks queued.\n";
  pool.reset(); // wait for all tasks to complete
  std::cout << "Done.\n";
}

输出:

Hello from task 3
Hello from task 4
Hello from task 2
Hello from task 1
Hello from task 5
Hello from task 7
Hello from task 6
Hello from task 8
Hello from task 9
Hello from task 10
Hello from task 11
Hello from task 12
Hello from task 13
Hello from task 14
Hello from task 15
Hello from task 16
Hello from task 17
Hello from task 18
All tasks queued.
Hello from task 19
Hello from task 20
Done.
于 2016-08-04T15:42:28.290 回答
0

you could use the strand object to put the events and put a delay in your main ? Is your program dropping out after all the work is posted? If so you can use the work object which will give you more control over when your io_service stops.

you could always main check the state of the threads and have it wait untill one becomes free or something like that.

//links

http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/reference/io_service__strand.html

http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/reference/io_service.html

//example from the second link
boost::asio::io_service io_service;
boost::asio::io_service::work work(io_service);

hope this helps.

于 2012-07-27T10:25:26.420 回答
0

也许尝试降低主线程的优先级,以便一旦工作线程忙起来,他们就会饿死主线程和系统自我限制。

于 2016-08-04T15:50:19.997 回答