13

我实现了以下后台处理线程,其中JobsQueue<T>

static void WorkThread()
{
    while (working)
    {
        var job;

        lock (Jobs)
        {
            if (Jobs.Count > 0)
                job = Jobs.Dequeue();
        }

        if (job == null)
        {
            Thread.Sleep(1);
        }
        else
        {
            // [snip]: Process job.
        }
    }
}

这在输入作业和实际开始运行之间产生了明显的延迟(一次输入批量作业,并且每个作业只是[相对]小。)延迟并不是什么大问题,但我开始思考这个问题,并进行了以下更改:

static ManualResetEvent _workerWait = new ManualResetEvent(false);
// ...
    if (job == null)
    {
        lock (_workerWait)
        {
            _workerWait.Reset();
        }
        _workerWait.WaitOne();
    }

添加作业的线程现在在添加作业后锁定_workerWait并调用_workerWait.Set()。该解决方案(似乎)立即开始处理作业,并且延迟完全消失了。

我的问题部分是“为什么会发生这种情况?”,当然Thread.Sleep(int)可以睡得比您指定的更长,部分是“如何ManualResetEvent达到这种性能水平?”。

编辑:由于有人询问了排队项目的功能,这里是,以及目前的完整系统。

public void RunTriggers(string data)
{
    lock (this.SyncRoot)
    {
        this.Triggers.Sort((a, b) => { return a.Priority - b.Priority; });

        foreach (Trigger trigger in this.Triggers)
        {
            lock (Jobs)
            {
                Jobs.Enqueue(new TriggerData(this, trigger, data));
                _workerWait.Set();
            }
        }
    }
}

static private ManualResetEvent _workerWait = new ManualResetEvent(false);
static void WorkThread()
{
    while (working)
    {
        TriggerData job = null;

        lock (Jobs)
        {
            if (Jobs.Count > 0)
                job = Jobs.Dequeue();

            if (job == null)
            {
                _workerWait.Reset();
            }
        }

        if (job == null)
            _workerWait.WaitOne();
        else
        {
            try
            {
                foreach (Match m in job.Trigger.Regex.Matches(job.Data))
                    job.Trigger.Value.Action(job.World, m);
            }
            catch (Exception ex)
            {
                job.World.SendLineToClient("\r\n\x1B[32m -- {0} in trigger ({1}): {2}\x1B[m",
                    ex.GetType().ToString(), job.Trigger.Name, ex.Message);
            }
        }
    }
}
4

2 回答 2

18

这些事件是专门为此类事情设计的 OS/Kernel 提供的内核原语。内核提供了一个边界,您可以在该边界上保证对同步很重要的原子操作(一些原子性也可以在用户空间中通过硬件支持完成)。

简而言之,当线程等待某个事件时,它会被放入该事件的等待列表中并标记为不可运行。当事件发出信号时,内核唤醒等待列表中的那些并将它们标记为可运行,它们可以继续运行。当事件发出信号时,线程可以立即唤醒,而不是长时间休眠并时不时地重新检查条件,这自然是一个巨大的好处。

即使一毫秒也是一个非常长的时间,你可以在那段时间内处理数千个事件。此外,时间分辨率传统上是 10 毫秒,因此睡眠时间少于 10 毫秒通常只会导致 10 毫秒的睡眠。通过事件,可以立即唤醒并调度线程

于 2009-07-12T15:58:48.530 回答
10

首先锁定_workerWait是没有意义的,Event 是一个系统(内核)对象,用于在线程之间发出信号(并且在 Win32 API 中大量用于异步操作)。因此,多个线程在没有额外同步的情况下设置或重置它是非常安全的。

至于您的主要问题,还需要查看将事物放入队列的逻辑,以及有关每个作业完成多少工作的一些信息(工作线程是花费更多时间处理工作还是等待工作)。

可能最好的解决方案是使用对象实例来锁定并使用Monitor.PulseMonitor.Wait作为条件变量。

编辑:从代码入队的角度来看,答案#1116297似乎是正确的:1 毫秒的延迟太长了,无法等待,因为许多工作项的处理速度非常快。

具有唤醒工作线程的机制的方法是正确的(因为没有具有阻塞出队操作的 .NET 并发队列)。然而,与使用事件相比,条件变量会更有效一些(因为在非竞争情况下,它不需要内核转换):

object sync = new Object();
var queue = new Queue<TriggerData>();

public void EnqueueTriggers(IEnumerable<TriggerData> triggers) {
  lock (sync) {
    foreach (var t in triggers) {
      queue.Enqueue(t);
    }
    Monitor.Pulse(sync);  // Use PulseAll if there are multiple worker threads
  }
}

void WorkerThread() {
  while (!exit) {
    TriggerData job = DequeueTrigger();
    // Do work
  }
}

private TriggerData DequeueTrigger() {
  lock (sync) {
    if (queue.Count > 0) {
      return queue.Dequeue();
    }
    while (queue.Count == 0) {
      Monitor.Wait(sync);
    }
    return queue.Dequeue();
  }
}

Monitor.Wait 将释放参数上的锁,等到Pulse()或被PulseAll()调用来反对锁,然后重新进入锁并返回。需要重新检查等待条件,因为其他线程可能已经从队列中读取了该项目。

于 2009-07-12T15:44:36.793 回答