0

我有一个包含任务队列的 TaskManager 类。每次下一个任务被弹出并执行。

class TaskManager
{
    TaskQueue m_queue;

    svc_tasks()
    {
         while (!m_queue.empty())
         {
             Task* task = m_queue.pop();
             task->execute();
         }
    }
};

在任务中有某些点我想暂停至少 SLEEP_TIME_MS 毫秒。在此暂停期间,我想开始执行下一个任务。当暂停结束时,我想再次将任务放入队列中。

class Task
{
    int m_phase = -1;

    execute()
    {
        m_phase++;

        switch(m_phase)
        {
         case 0:
             ...
             do_pause(SLEEP_TIME_MS);
             return;
         case 1:
             ...
             break;
        }
    }
};

std (C++ 17) 或 boost 中是否有一个调度程序,我可以使用它在 SLEEP_TIME_MS 通过时调用处理程序函数?

谢谢你的任何建议

4

1 回答 1

1

你可以使用boost::asio::high_resolution_timer它的async_wait方法。

每次当您想安排将任务推入队列的操作时,您必须:

  1. 创造high_resolution_timer
  2. 指定到期时间(SLEEP_TIME_MS)的调用expires_after,即调用处理程序的时间。在您的情况下,在此处理程序中,您将任务推送到队列中。
  3. async_wait与您的处理程序通话

如果我们假设该execute方法返回bool指示任务是否完成(所有阶段都已执行),它可能会被重写为这样的:

     while (!m_queue.empty()) // this condition should be changed
     {
         Task* task = m_queue.pop();
         bool finished = task->execute();
         if (!finished)
            scheduler works here - start async_wait with handler
     }

如果我理解正确,您想在SLEEP_TIME_MS过期时将任务推入队列,因此当队列为空时您不能中断循环,因为您必须等到待处理的任务完成。你可以介绍stopflag。并按需中断循环。


下面我放了一段代码,它按照你描述的方式工作(我希望):

struct Scheduler {
    Scheduler(boost::asio::io_context& io)
    : io(io) {}

    boost::asio::io_context& io;

    template<class F>
    void schedule (F&& handler) {
        auto timer = std::make_shared<boost::asio::high_resolution_timer>(io);
        timer->expires_after(std::chrono::milliseconds(5000)); // SLEEP_TIME_MS 
        timer->async_wait(
            [timer,handler](const boost::system::error_code& ec) {
                handler();
            });
    }
};

struct Task  {
    int phase = -1;

    bool execute() {
        ++phase;
        std::cout << "phase: " << phase << std::endl;
        if (phase == 0) {
            return false;
        }
        else {

        }
        return true;
    }
};

struct TaskManager {
    Scheduler s;
    std::queue<std::shared_ptr<Task>> tasks;
    std::mutex tasksMtx;
    std::atomic<bool> stop{false};

    TaskManager(boost::asio::io_context& io) : s(io) {
        for (int i = 0; i < 5; ++i)
            tasks.push(std::make_shared<Task>());
    }

    void run() {
        while (true) {
            if (stop)
                break;

            {
                std::lock_guard<std::mutex> lock{tasksMtx};
                if (tasks.empty())
                    continue;
            }

            std::shared_ptr<Task> currTask = tasks.front();
            tasks.pop();

            bool finished = currTask->execute();
            if (!finished)
                s.schedule( [this, currTask](){ insertTaskToVector(std::move(currTask)); } );
        }
    }

    template<class T>
    void insertTaskToVector(T&& t) {
        std::lock_guard<std::mutex> lock{tasksMtx};
        tasks.push(std::forward<T>(t));
    }
};

int main() {
    boost::asio::io_context io;
    boost::asio::io_context::work work{io};
    std::thread th([&io](){ io.run();});
    TaskManager tm(io);
    tm.run();
于 2019-09-04T18:09:53.710 回答