6

我正在用 C++ 编写一个程序。我注意到它获得了许多线程,其目的是每隔一段时间做一些事情,其中​​有 3 或 4 个。我决定通过编写一个调度程序服务进行重构,使用这些线程的其他地方可以订阅该服务,这应该可以将我在任何时候运行的额外事件线程的数量减少到只有一个。

我还没有任何使用它的代码;在我开始写它之前,我想知道它是否可能,并获得一些关于我的设计的反馈。我想要完成的简要描述是这样的:

添加事件

  1. 调用者提供事件和时间表
  2. 时间表提供事件的下一次发生
  3. (event, schedule) 对被添加到事件队列中
  4. 中断睡眠事件线程(即唤醒它)

事件线程主循环

  1. 尝试获取事件队列中的下一个事件
  2. 如果没有待处理的事件,直接跳到 4
  3. 获取下一个事件应该发生的时间
  4. 休眠直到下一个事件(如果没有等待事件,则永远休眠)
  5. 如果睡眠因任何原因被中断,则循环回 1
  6. 如果睡眠成功完成,则执行当前事件
  7. 更新队列(删除事件,如果是重复事件则重新插入)
  8. 跳回 1

我做了一些研究,知道有可能中断一个睡眠线程,我相信只要防止同时访问事件队列,就不应该有任何危险的行为。我想唤醒一个线程是可能的,java 的 Thread 的 sleep() 调用在某些情况下会抛出一个 InterruptedException ,除非它不依赖于操作系统的底层 sleep 调用,否则它必须以某种方式成为可能。

问题

任何人都可以评论我的方法吗?这是一个我最好不要重新发明的轮子吗?具体来说,您如何中断睡眠线程以便在下一条指令处恢复执行,并且是否可以从中断的线程中检测到这一点?

关于提升的说明

我敢打赌,你可以编写一个带有 boost 的调度程序,但是由于缺乏更好的短语,它可以在一台机器上编译和运行,这是一堆废话。我之前已经编译过 boost 程序,每个拉入 boost 的文件通常需要 30 秒以上的时间来编译。如果我能避免这个恼人的发展障碍,我非常愿意。

附录 - 工作代码 [根据 caf 的建议修改]

这是我生成的有效代码。它已经过初步测试,但可以正确处理具有不同延迟的单个和重复事件。

这是事件线程的主体:

void Scheduler::RunEventLoop()
{
    QueueLock();                   // lock around queue access
    while (threadrunning)
    {
        SleepUntilNextEvent();     // wait for something to happen

        while (!eventqueue.empty() && e.Due())
        {                          // while pending due events exist
            Event e = eventqueue.top();
            eventqueue.pop();

            QueueUnlock();         // unlock
            e.DoEvent();           // perform the event
            QueueLock();           // lock around queue access

            e.Next();              // decrement repeat counter
                                   // reschedule event if necessary
            if (e.ShouldReschedule()) eventqueue.push(e);
        }
    }
    QueueUnlock();                 // unlock
    return;                        // if threadrunning is set to false, exit
}

这是睡眠功能:

void Scheduler::SleepUntilNextEvent()
{
    bool empty = eventqueue.empty();  // check if empty

    if (empty)
    {
        pthread_cond_wait(&eventclock, &queuelock); // wait forever if empty
    }
    else
    {
        timespec t =                  // get absolute time of wakeup
            Bottime::GetMillisAsTimespec(eventqueue.top().Countdown() + 
                                         Bottime::GetCurrentTimeMillis());
        pthread_cond_timedwait(&eventclock, &queuelock, &t); // sleep until event
    }
}

最后,添加事件:

void Scheduler::AddEvent(Event e)
{
    QueueLock();
    eventqueue.push(e);
    QueueUnlock();
    NotifyEventThread();
}

相关变量声明:

bool threadrunning;
priority_queue<Event, vector<Event>, greater<Event> > eventqueue;
pthread_mutex_t queuelock; // QueueLock and QueueUnlock operate on this
pthread_cond_t eventclock;

为了处理泛型事件的问题,每个事件都Event包含一个指向抽象类型对象的指针,其action子类覆盖action::DoEvent。这个方法是从内部调用的Event::DoEventactions由他们的事件“拥有”,即如果不再需要重新安排事件,它们会被自动删除。

4

4 回答 4

10

您正在寻找的是pthread_cond_t对象pthread_cond_timedwaitpthread_cond_wait功能。您可以创建条件变量isThereAnyTaskToDo并在事件线程中等待它。添加新事件时,您只需使用pthread_cond_signal().

于 2012-08-19T07:56:06.387 回答
3

在 *NIX 平台和 Windows 上都有多种可能性。您的计时器线程应该在事件/条件变量对象上使用某种定时等待来等待。在 POSIX 平台上,您可以使用pthread_cond_timedwait(). 在 Windows 上,您可以选择计算必要的时间增量并用于WaitForSingleObject()事件句柄,或者您可以将事件对象与CreateTimerQueueTimer()或结合使用CreateWaitableTimer()。Boost 也有一些同步原语,您可以使用它们通过类似 POSIX 的原语来实现这一点,但可移植。

更新:

POSIX 也有一些计时器功能,请参阅create_timer()

于 2012-08-19T08:01:28.090 回答
3

我同意Gregwilx -pthread_cond_timedwait()可用于实现您所追求的行为。我只是想补充一点,您可以简化事件线程主循环:

  1. 尝试获取事件队列中的下一个事件
  2. 如果没有待处理的事件,直接跳到 4
  3. 获取下一个事件应该发生的时间
  4. 等待条件变量,pthread_cond_timedwait()直到下一个事件(或pthread_cond_wait()如果没有预定事件)
  5. 尝试获取事件队列中的下一个事件
  6. 如果还没有过期的事件,返回4
  7. 更新队列(删除事件,如果是重复事件则重新插入)
  8. 跳回 5

因此,您不必关心自己为什么醒来 - 每当您醒来时,您都会检查当前时间并运行任何已过期的事件,然后返回等待。在大多数情况下,当添加新事件时,您当然会发现没有任何事件过期 - 您只需重新计算等待时间。

您可能希望将队列实现为优先级队列,以便下一个到期事件始终位于最前面。

于 2012-08-19T09:19:52.777 回答
1

您当前的解决方案包含竞争条件 - 例如,这里:

QueueLock();                      // lock around queue access
bool empty = eventqueue.empty();  // check if empty
QueueUnlock();                    // unlock

pthread_mutex_lock(&eventmutex);  // lock event mutex (for condition)
if (empty)
{
    pthread_cond_wait(&eventclock, &eventmutex); // wait forever if empty
}

考虑一下如果队列最初是空的,但另一个线程与之竞争并在其间推送一个新值会发生什么QueueUnlock()-pthread_mutex_lock(&eventmutex)新事件的唤醒将被错过。另请注意,在SleepUntilNextEvent()您访问eventqueue.top()时无需持有队列锁。

传递给的互斥锁pthread_cond_wait()应该是保护与信号相关的共享状态的互斥锁。在这种情况下,“共享状态”是队列本身,因此您可以通过仅使用一个保护队列的互斥体来解决这些问题:

void Scheduler::RunEventLoop()
{

    pthread_mutex_lock(&queuemutex);
    while (threadrunning)
    {
        while (!eventqueue.empty() && e.Due())
        {                          // while pending due events exist
            Event e = eventqueue.top();
            eventqueue.pop();

            pthread_mutex_unlock(&queuemutex);
            e.DoEvent();           // perform the event
            e.Next();              // decrement repeat counter
            pthread_mutex_lock(&queuemutex);
                                   // reschedule event if necessary
            if (e.ShouldReschedule()) eventqueue.push(e);
        }

        SleepUntilNextEvent();     // wait for something to happen
    }
    pthread_mutex_unlock(&queuemutex);

    return;                        // if threadrunning is set to false, exit
}

/* Note: Called with queuemutex held */
void Scheduler::SleepUntilNextEvent()
{
    if (eventqueue.empty())
    {
        pthread_cond_wait(&eventclock, &queuemutex); // wait forever if empty
    }
    else
    {
        timespec t =                  // get absolute time of wakeup
            Bottime::GetMillisAsTimespec(eventqueue.top().Countdown() + 
                                         Bottime::GetCurrentTimeMillis());
        pthread_cond_timedwait(&eventclock, &queuemutex, &t); // sleep until event
    }
}

请注意,pthread_cond_wait()pthread_cond_timedwait()在它们等待时释放互斥锁(互斥锁被释放并且等待相对于正在发出信号的互斥锁以原子方式开始),因此调度程序在睡眠时不会持有互斥锁。

于 2012-08-22T03:15:15.083 回答