6

这个想法是能够在消费者/生产者问题上用 boost::asio 和线程池替换多线程代码。当前,每个消费者线程都在等待boost::condition_variable- 当生产者向队列中添加某些内容时,它会调用notify_one/notify_all来通知所有消费者。现在,当您(可能)拥有 1k+ 消费者时会发生什么?线程不会扩展!

我决定使用boost::asio,但后来我发现它没有条件变量。然后async_condition_variable诞生了:

class async_condition_variable
{
private:
    boost::asio::io_service& service_;
    typedef boost::function<void ()> async_handler;
    std::queue<async_handler> waiters_;

public:
    async_condition_variable(boost::asio::io_service& service) : service_(service)
    {
    }

    void async_wait(async_handler handler)
    {
        waiters_.push(handler);
    }

    void notify_one()
    {
        service_.post(waiters_.front());
        waiters_.pop();
    }

    void notify_all()
    {
        while (!waiters_.empty()) {
            notify_one();
        }
    }
};

基本上,每个消费者都会调用async_condition_variable::wait(...). 然后,生产者最终会调用async_condition_variable::notify_one()or async_condition_variable::notify_all()。每个消费者的句柄都会被调用,并且会根据条件采取行动或async_condition_variable::wait(...)再次调用。这是可行的还是我在这里疯了?鉴于这将在线程池上运行,应该执行哪种锁定(互斥锁)?

PS:是的,这更像是一个 RFC(征求意见)而不是一个问题 :)。

4

3 回答 3

3

列出事件发生时需要做的事情。具有向该列表添加某些内容的功能以及从该列表中删除某些内容的功能。然后,当事件发生时,让一个线程池处理现在需要完成的作业列表。您不需要专门等待事件的线程。

于 2012-07-10T00:26:17.200 回答
1

Boost::asio 可能有点难以理解。至少,我很难做到。

您不需要让线程等待任何事情。当他们没有任何工作要做时,他们会自己做。看起来像您想要做的示例已将每个项目的工作发布到 io_service。

以下代码的灵感来自此链接。它实际上让我大开眼界,你可以如何使用它来做很多事情。

我确信这并不完美,但我认为它给出了总体思路。我希望这有帮助。

代码

#include <iostream>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
class ServerProcessor
{
protected:
    void handleWork1(WorkObject1* work)
    {
        //The code to do task 1 goes in here
    }
    void handleWork2(WorkObject2* work)
    {
        //The code to do task 2 goes in here
    }

    boost::thread_group worker_threads_;

    boost::asio::io_service io_service_;
    //This is used to keep io_service from running out of work and exiting to soon.
    boost::shared_ptr<boost::asio::io_service::work> work_;


public:
    void start(int numberOfThreads)
    {
        boost::shared_ptr<boost::asio::io_service::work> myWork(new boost::asio::io_service::work(io_service_));
        work_=myWork;

        for (int x=0; x < numberOfThreads; ++x)
            worker_threads_.create_thread( boost::bind( &ServerProcessor::threadAction, this ) );

    }

    void doWork1(WorkObject1* work)
    {
        io_service_.post(boost::bind(&ServerProcessor::handleWork1, this, work));
    }

    void doWork2(WorkObject2* work)
    {
        io_service_.post(boost::bind(&ServerProcessor::handleWork2, this, work));
    }


    void threadAction()
    {
        io_service_.run();
    }

    void stop()
    {
        work_.reset();
        io_service_.stop();
        worker_threads_.join_all();
    }

};

int main()
{
    ServerProcessor s;

    std::string input;
    std::cout<<"Press f to stop"<<std::endl;

    s.start(8);

    std::cin>>input;

    s.stop();

    return 0;
}
于 2012-11-19T22:56:13.370 回答
0

使用 boost::signals2 怎么样?

它是 boost::signals 的线程安全衍生产品,可让您的客户端订阅要发出的信号的回调。

然后,当信号在 io_service 调度的作业中异步发出时,所有注册的回调将被执行(在发出信号的同一线程上)。

于 2016-10-13T04:04:02.187 回答