3

This question comes from: C++11 thread doesn't work with virtual member function

As suggested in a comment, my question in previous post may not the right one to ask, so here is the original question:

I want to make a capturing system, which will query a few sources in a constant/dynamic frequency (varies by sources, say 10 times / sec), and pull data to each's queues. while the sources are not fixed, they may add/remove during run time.

and there is a monitor which pulls from queues at a constant freq and display the data.

So what is the best design pattern or structure for this problem.

I'm trying to make a list for all the sources pullers, and each puller holds a thread, and a specified pulling function (somehow the pulling function may interact with the puller, say if the source is drain, it will ask to stop the pulling process on that thread.)

4

1 回答 1

2

除非您查询源的操作是阻塞的(或者您有很多),否则您不需要为此使用线程。我们可以从一个Producer可以与同步或异步(线程)调度一起使用的开始:

template <typename OutputType>
class Producer
{
    std::list<OutputType> output;

protected:
    int poll_interval; // seconds? milliseconds?
    virtual OutputType query() = 0;

public:
    virtual ~Producer();

    int next_poll_interval() const { return poll_interval; }
    void poll() { output.push_back(this->query()); }

    std::size_t size() { return output.size(); }
    // whatever accessors you need for the queue here:
    // pop_front, swap entire list, etc.
};

现在我们可以从中派生并在每个子类型中Producer实现该方法。query您可以poll_interval在构造函数中设置并保持不变,或者在每次调用时更改它query。有你的通用生产者组件,不依赖于调度机制。

template <typename OutputType>
class ThreadDispatcher
{
    Producer<OutputType> *producer;
    bool shutdown;
    std::thread thread;

    static void loop(ThreadDispatcher *self)
    {
        Producer<OutputType> *producer = self->producer;

        while (!self->shutdown)
        {
            producer->poll();
            // some mechanism to pass the produced values back to the owner
            auto delay = // assume millis for sake of argument
                std::chrono::milliseconds(producer->next_poll_interval());
            std::this_thread::sleep_for(delay);
        }
    }

public:
    explicit ThreadDispatcher(Producer<OutputType> *p)
      : producer(p), shutdown(false), thread(loop, this)
    {
    }

    ~ThreadDispatcher()
    {
        shutdown = true;
        thread.join();
    }

    // again, the accessors you need for reading produced values go here
    // Producer::output isn't synchronised, so you can't expose it directly
    // to the calling thread
};

这是一个简单的调度程序的速写,它将在一个线程中运行你的生产者,无论你经常要求它轮询它。请注意,未显示将生成的值传递回所有者,因为我不知道您想如何访问它们。

另请注意,我没有同步对关闭标志的访问 - 它可能应该是原子的,但它可能会通过您选择对生成的值进行的任何操作来隐式同步。

有了这个组织,编写同步调度程序来在单个线程中查询多个生产者也很容易,例如从选择/轮询循环中,或者使用类似 Boost.Asio 和每个生产者的截止时间计时器。

于 2012-05-17T14:07:29.053 回答