9

是否有 .Net 类可以做什么ManualResetEvent.PulseAll()(如果存在)?

我需要原子地释放一组等待同一信号的线程。(对于我的预期用途,我并不担心“线程踩踏”。)

您不能使用 aManualResetEvent来执行此操作。例如,如果您这样做:

ManualResetEventSlim signal = new ManualResetEventSlim();
// ...
signal.Set();
signal.Reset();

然后根本不会释放等待信号的线程。

如果你在and调用Thread.Sleep(5)之间放置一个,那么一些但不是所有的等待线程都被释放。将睡眠时间增加到 10 毫秒可以释放所有线程。(这是用 20 个线程测试的。)Set()Reset()

显然,通过添加Thread.Sleep()来完成这项工作是不可接受的。

然而,这很容易做到,Monitor.PulseAll()我已经写了一个小类来做到这一点。(我编写一个类来执行此操作的原因是我们发现使用 Monitor 的逻辑虽然相当简单,但并不明显,因此值得拥有这样一个类来简化使用。)

我的问题很简单:.Net 中是否已经有一个类可以做到这一点?

作为参考,这是我的“ ManualResetEvent.PulseAll()”等价物的基本版本:

public sealed class Signaller
{
    public void PulseAll()
    {
        lock (_lock)
        {
            Monitor.PulseAll(_lock);
        }
    }

    public void Wait()
    {
        Wait(Timeout.Infinite);
    }

    public bool Wait(int timeoutMilliseconds)
    {
        lock (_lock)
        {
            return Monitor.Wait(_lock, timeoutMilliseconds);
        }
    }

    private readonly object _lock = new object();
}

这是一个示例程序,它演示了如果您在 Set() 和 Reset() 之间不休眠,则不会释放等待线程:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    public static class Program
    {
        private static void Main(string[] args)
        {
            _startCounter = new CountdownEvent(NUM_THREADS);

            for (int i = 0; i < NUM_THREADS; ++i)
            {
                int id = i;
                Task.Factory.StartNew(() => test(id));
            }

            Console.WriteLine("Waiting for " + NUM_THREADS + " threads to start");
            _startCounter.Wait(); // Wait for all threads to have started.
            Thread.Sleep(100);
            Console.WriteLine("Threads all started. Setting signal now.");
            _signal.Set();
            // Thread.Sleep(5); // With no sleep at all, NO threads receive the signal. Try commenting this line out.
            _signal.Reset();
            Thread.Sleep(1000);
            Console.WriteLine("\n{0}/{1} threads received the signal.\n\n", _signalledCount, NUM_THREADS);
            Console.WriteLine("Press any key to exit.");
            Console.ReadKey();
        }

        private static void test(int id)
        {
            _startCounter.Signal(); // Used so main thread knows when all threads have started.
            _signal.Wait();
            Interlocked.Increment(ref _signalledCount);
            Console.WriteLine("Task " + id + " received the signal.");
        }

        private const int NUM_THREADS = 20;

        private static readonly ManualResetEventSlim _signal = new ManualResetEventSlim();
        private static CountdownEvent _startCounter;
        private static int _signalledCount;
    }
}
4

2 回答 2

3

您可以使用Barrier对象。它允许运行未指定数量的任务,然后等待所有其他任务达到该点。

如果您不知道哪些任务将从哪些代​​码块开始作为特定的工作单元工作,您可以以类似于 Go 中的 WaitGroup 的方式使用它。

于 2013-06-05T22:40:55.930 回答
1

版本 1
最大清晰度:在每个周期ManualResetEvent开始时急切地安装一个新的。PulseAll

public class PulseEvent
{
    public PulseEvent()
    {
        mre = new ManualResetEvent(false);
    }

    ManualResetEvent mre;

    public void PulseAll() => Interlocked.Exchange(ref mre, new ManualResetEvent(false)).Set();

    public bool Wait(int ms) => Volatile.Read(ref mre).WaitOne(ms);

    public void Wait() => Wait(Timeout.Infinite);
};

版本 2此版本避免为在没有服务员的情况下完成
的任何周期创建内部事件。PulseAll每个周期的第一个等待者会进入一个乐观的无锁竞赛,以创建和原子地安装单个共享事件。

public class PulseEvent
{
    ManualResetEvent mre;

    public void PulseAll() => Interlocked.Exchange(ref mre, null)?.Set();

    public bool Wait(int ms)
    {
        ManualResetEvent tmp =
           mre ??
           Interlocked.CompareExchange(ref mre, tmp = new ManualResetEvent(false), null) ??
           tmp;
        return tmp.WaitOne(ms);
    }

    public void Wait() => Wait(Timeout.Infinite);
};

版本 3
此版本通过分配两个持久ManualResetEvent对象并在它们之间翻转来消除每个周期的分配。与上述示例相比,这稍微改变了语义,如下所示:

  • 首先,回收相同的两个锁意味着您的PulseAll周期必须足够长,以允许所有服务员清除前一个锁。否则,当您快速连续调用两次时,任何由上一次PulseAll调用假定释放但操作系统尚未有机会安排的等待线程可能最终会在新周期中被重新阻塞,因为出色地。我提到这主要是作为一个理论上的考虑,因为这是一个没有实际意义的问题,除非你在亚微秒脉冲周期上阻塞极端数量的线程。您可以决定此条件是否与您的情况相关。如果是这样,或者如果您不确定或谨慎,您可以随时使用没有此限制的版本 1版本 2 。 PulseAll

  • 在这个版本中,“可以说”不同(但请参阅下面的段落,了解为什么第二点可能被证明不相关),对PulseAll被认为基本上同时的调用被合并,这意味着除了这些多个“同时”调用者中的一个之外,所有调用者都成为NOPs。这种行为并非没有先例(请参阅此处的“备注”)并且可能是可取的,具体取决于应用程序。

请注意,后一点必须被视为合理的设计选择,而不是错误、理论缺陷或并发错误。这是因为脉冲锁在多个同时的情况下本质上是模棱两可的PulseAll:具体来说,没有办法证明任何没有被单个指定脉冲器释放的服务员必然会被其他合并/省略的脉冲之一释放.

换一种说法,这种类型的锁并不是为了原子地序列化PulseAll调用者而设计的,事实上它确实不可能,因为跳过的“同时”脉冲总是有可能独立地来来去去,甚至如果完全合并脉冲的时间之后,但在服务员到达之前仍然“脉冲”(谁不会得到脉冲)。

public class PulseEvent
{
    public PulseEvent()
    {
        cur = new ManualResetEvent(false);
        alt = new ManualResetEvent(true);
    }

    ManualResetEvent cur, alt;

    public void PulseAll()
    {
        ManualResetEvent tmp;
        if ((tmp = Interlocked.Exchange(ref alt, null)) != null) // try claiming 'pulser'
        {
            tmp.Reset();                     // prepare for re-use, ending previous cycle
            (tmp = Interlocked.Exchange(ref cur, tmp)).Set();    // atomic swap & pulse
            Volatile.Write(ref alt, tmp);    // release claim; re-allow 'pulser' claims
        }
    }

    public bool Wait(int ms) => cur.WaitOne(ms);  // 'cur' is never null (unlike 'alt')

    public void Wait() => Wait(Timeout.Infinite);
};

 


最后,一些一般性的观察。这里和这种类型的代码中一个重要的反复出现的主题通常是,当它仍然公开可见时,ManualResetEvent不得将其更改为已发出信号的状态(即通过调用Set)。在上面的代码中,我们使用原子方式更改 'cur' 中的活动锁的身份(在这种情况下,通过即时交换备用)并在确保不会再添加新服务员的关键之前执行此操作到那个,超越那些在交换的那一刻已经被阻止的。Interlocked.ExchangeSetManualResetEvent

只有在此交换之后,通过调用Set我们的(现在)私有副本来释放那些等待的线程才是安全的。如果我们在它还出版SetManualResetEvent时候去拜访,一个迟到的服务员实际上错过了瞬时脉搏,仍然有可能看到打开的锁并航行通过而无需等待下一个,如定义。

有趣的是,这意味着即使直觉上感觉“脉冲”发生的确切时刻应该与Set被调用的时刻相吻合,但实际上更正确地说,它是在那之前,在 的时刻Interlocked.Exchange,因为那是动作这严格规定了截止时间之前/之后,并密封了要被释放的最终服务员(如果有的话)。

因此,错过截止时间并在之后立即到达的服务员必须只能看到 - 并将阻止 - 现在指定为下一个周期的事件,即使当前周期尚未发出信号也是如此,也没有释放它的任何等待线程,所有这些都是“瞬时”脉冲的正确性所要求的。

于 2017-07-13T04:05:19.390 回答