26

我有一堆线程会生成 typeA和 type的事件B

我的程序接收这些事件,将它们包装在一条消息中并通过网络发送它们。消息可以包含一个A事件、一个B事件或一个A事件和一个B事件:

SendMessage(new Message(a: 1,    b: null));
SendMessage(new Message(a: null, b: 2   ));
SendMessage(new Message(a: 3,    b: 4   ));

类型A事件发生得相当频繁,而类型事件发生的频率B要低得多。因此,当一个线程生成一个B事件时,我的程序会稍等片刻,看看另一个线程是否生成了一个事件,并在可能的情况下将事件和A事件结合起来。AB

这是我的代码:

object gate = new object();
int? pendingB;

Message WrapA(int a, int millisecondsTimeout)
{
    int? b;

    lock (gate)
    {
        b = pendingB;
        pendingB = null;
        Monitor.Pulse(gate);
    }

    return new Message(a, b);
}

Message WrapB(int b, int millisecondsTimeout)
{
    lock (gate)
    {
        if (pendingB == null)
        {
            pendingB = b;
            Monitor.Wait(gate, millisecondsTimeout);
            if (pendingB != b) return null;
            pendingB = null;
        }
    }

    return new Message(null, b);
}

到目前为止,这有效。但是,有两个问题:

  • 如果有很多A事件和很多事件,该算法不是很有效:即使有足够的事件,也B只有一定百分比的B事件附加到事件。AA

  • 如果有A一段时间没有事件生成(不常见,但并非不可能),该算法是完全不公平的:一个线程生成B事件每次都必须等待,而所有其他线程可以立即发送它们的B事件。

如何提高算法的效率和公平性?

约束:
•  WrapA并且WrapB必须在很短的、确定的时间内终止。
•  SendMessage必须在任何锁之外调用。
• 除了 之外,没有可用的同步机制gate
• 没有额外的线程、任务、计时器等可用。
• 由于A在正常情况下此类事件发生得如此频繁,因此忙于等待WrapB是可以的。


这是一个可以用作基准测试的测试程序:

public static class Program
{
    static int counter0 = 0;
    static int counterA = 0;
    static int counterB = 0;
    static int counterAB = 0;

    static void SendMessage(Message m)
    {
        if (m != null)
            if (m.a != null)
                if (m.b != null)
                    Interlocked.Increment(ref counterAB);
                else
                    Interlocked.Increment(ref counterA);
            else
                if (m.b != null)
                    Interlocked.Increment(ref counterB);
                else
                    Interlocked.Increment(ref counter0);
    }

    static Thread[] Start(int threadCount, int eventCount,
        int eventInterval, int wrapTimeout, Func<int, int, Message> wrap)
    {
        Thread[] threads = new Thread[threadCount * eventCount];
        for (int i = 0; i < threadCount; i++)
        {
            for (int j = 0; j < eventCount; j++)
            {
                int k = i * 1000 + j;
                int l = j * eventInterval + i;
                threads[i * eventCount + j] = new Thread(() =>
                {
                    Thread.Sleep(l);
                    SendMessage(wrap(k, wrapTimeout));
                });
                threads[i * eventCount + j].Start();
            }
        }
        return threads;
    }

    static void Join(params Thread[] threads)
    {
        for (int i = 0; i < threads.Length; i++)
        {
            threads[i].Join();
        }
    }

    public static void Main(string[] args)
    {
        var wrapper = new MessageWrapper();
        var sw = Stopwatch.StartNew();

        // Only A events
        var t0 = Start(10, 40, 7, 1000, wrapper.WrapA);
        Join(t0);

        // A and B events
        var t1 = Start(10, 40, 7, 1000, wrapper.WrapA);
        var t2 = Start(10, 10, 19, 1000, wrapper.WrapB);
        Join(t1);
        Join(t2);

        // Only B events
        var t3 = Start(10, 20, 7, 1000, wrapper.WrapB);
        Join(t3);

        Console.WriteLine(sw.Elapsed);

        Console.WriteLine("0:  {0}", counter0);
        Console.WriteLine("A:  {0}", counterA);
        Console.WriteLine("B:  {0}", counterB);
        Console.WriteLine("AB: {0}", counterAB);

        Console.WriteLine("Generated A: {0}, Sent A: {1}",
            10 * 40 + 10 * 40, counterA + counterAB);
        Console.WriteLine("Generated B: {0}, Sent B: {1}",
            10 * 10 + 10 * 20, counterB + counterAB);
    }
}
4

14 回答 14

7

为了好玩,这里有一个无锁实现:

public sealed class MessageWrapper
{
    private int pendingB;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        int b = Interlocked.Exchange(ref pendingB, -1);
        return new Message(a, b == -1 ? null : b);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        var sw = new SpinWait();
        while (Interlocked.CompareExchange(ref pendingB, b, -1) != -1)
        {
            // Spin
            sw.SpinOnce();

            if (sw.NextSpinWillYield)
            {
                // Let us make progress instead of yielding the processor
                // (avoid context switch)
                return new Message(null, b);
            }
        }

        return null;
    }
}

结果

原始实现:

00:00:02.0433298
0:  0
A:  733
B:  233
AB: 67
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

无锁实现:

00:00:01.2546310
0:  0
A:  717
B:  217
AB: 83
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

更新

不幸的是,上面的实现有一个错误和一些缺点。这是一个改进的版本:

public class MessageWrapper
{
    private int pendingB = EMPTY;
    private const int EMPTY = -1;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        int? b;
        int count = 0;
        while ((b = Interlocked.Exchange(ref pendingB, EMPTY)) == EMPTY)
        {
            if (count % 7 == 0)
            {
                Thread.Sleep(0);
            }
            else if (count % 23 == 0)
            {
                Thread.Sleep(1);
            }
            else
            {
                Thread.Yield();
            }
            if (++count == 480)
            {
                return new Message(a, null);
            }
        }
        return new Message(a, b);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        int count = 0;
        while (Interlocked.CompareExchange(ref pendingB, b, EMPTY) != EMPTY)
        {
            // Spin
            Thread.SpinWait((4 << count++));
            if (count > 10)
            {
                // We didn't manage to place our payload.
                // Let's send it ourselves:
                return new Message(null, b);
            }
        }

        // We placed our payload. 
        // Wait some more to see if some WrapA snatches it.
        while (Interlocked.CompareExchange(ref pendingB, EMPTY, EMPTY) == b)
        {
            Thread.SpinWait((4 << count++));
            if (count > 20)
            {
                // No WrapA came along. Pity, we will have to send it ourselves
                int payload = Interlocked.CompareExchange(ref pendingB, EMPTY, b);
                return payload == b ? new Message(null, b) : null;
            }
        }
        return null;
    }
}

结果:

OP的实现

00:00:02.1389474
0:  0
A:  722
B:  222
AB: 78
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

第二种实现:

00:00:01.2752425
0:  0
A:  700
B:  200
AB: 100
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
于 2013-02-26T02:14:08.123 回答
5

为了多样性,我尝试了一种基于并发集合的方法。对我来说,从发布的约束中不清楚这是否可以,但无论如何我都会回答:

这是我机器上原始代码的典型输出:

00:00:01.7835426
0:  0
A:  723
B:  223
AB: 77
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

这是我建议的典型输出,比原始代码慢约 20%,但它捕获了更多的“AB”消息:

00:00:02.1322512
0:  0
A:  701
B:  201
AB: 99
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

MessageWrapper 实现:

public class MessageWrapper
{
    private BlockingCollection<int?> messageA = new BlockingCollection<int?>();
    private BlockingCollection<int?> messageB = new BlockingCollection<int?>();

    public Message WrapA(int a, int millisecondsTimeout)
    {
        messageA.Add(a);
        return CreateMessage(0);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        messageB.Add(b);
        return CreateMessage(millisecondsTimeout);
    }

    private Message CreateMessage(int timeout)
    {
        int? a, b;

        if (messageB.TryTake(out b) | messageA.TryTake(out a, timeout))
        {
            return new Message(a, b);
        }
        else
        {
            return null;
        }
    }
}
于 2013-02-24T17:45:12.050 回答
1

好的,所以我尝试创建一个快速 A 和 AB,然后创建一个慢 B。这意味着我的整体时间较慢(主要是因为 b-only 流),但组合时间和 a-only 时间更快。结果如下:

A's only: 00:00:00.3975499
Combine: 00:00:00.4234934
B's only: 00:00:02.0079422
Total: 00:00:02.8314751
0:  0
A:  700
B:  200
AB: 100
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

这是代码:

    class MessageWrapper
    {
        object bMessageLock = new object();
        object pendingBLock = new object();
        int? pendingB;

        ManualResetEvent gateOpen = new ManualResetEvent(true); // Gate is open initially.


        private bool IsGateOpen()
        {
            return gateOpen.WaitOne(0);
        }

        private void OpenGate()
        {
            gateOpen.Set();
        }

        private void CloseGate()
        {
            gateOpen.Reset();
        }


        public Message WrapA(int a, int millisecondsTimeout)
        {
            // check if the gate is open. Use WaitOne(0) to return immediately.
            if (IsGateOpen())
            {
                return new Message(a, null);
            }
            else
            {
                // This extra lock is to make sure that we don't get stale b's.
                lock (pendingBLock)
                {
                    // and reopen the gate.
                    OpenGate();

                    // there is a waiting b
                    // Send combined message
                    var message = new Message(a, pendingB);

                    pendingB = null;

                    return message;
                }
            }
        }

        public Message WrapB(int b, int millisecondsTimeout)
        {

            // Remove this if you don't have overlapping B's
            var timespentInLock = Stopwatch.StartNew();

            lock (bMessageLock) // Only one B message can be sent at a time.... may need to fix this.
            {
                pendingB = b;

                // Close gate
                CloseGate();


                // Wait for the gate to be opened again (meaning that the message has been sent)
                if (timespentInLock.ElapsedMilliseconds < millisecondsTimeout && 
                    gateOpen.WaitOne(millisecondsTimeout - (int)timespentInLock.ElapsedMilliseconds)) 
                // If you don't have overlapping b's use this clause instead.
                //if (gateOpen.WaitOne(millisecondsTimeout)) 
                {
                    lock (pendingBLock)
                    {
                        // Gate was opened, so combined message was sent.
                        return null;
                    }
                }
                else
                {
                    // Timeout expired, so send b-only message.
                    lock (pendingBLock)
                    {
                        // reopen gate.
                        OpenGate();
                        pendingB = null;
                        return new Message(null, b);
                    }
                }
            }
        }


    }

主要工作是使用手动重置事件完成的。这个想法是,如果门是打开的,那么你可以自由地发送 A。当“b”到达时,您关闭门并强制 A 将其合并。我必须说,拥有一个pendingB字段会在一定程度上限制此操作。只有一个变量意味着只有一个线程可以将它的 b 存储在 pendingB 中。这就是为什么我有多余的bMessageLock.

此外,需要控制对该变量的访问,因此pendingBLock.

这段代码中可能仍然存在错误,但就我的测试而言,我仍然得到了所有 100 条消息的组合。

最后,我对 WrapB 等待的时间进行了检查。最初 WrapB 只需要排队 200 秒。如果您有重叠的电话,那么您可以添加检查。如果您不介意他们排队,请改用更简单的代码。

于 2013-02-27T15:52:25.090 回答
1

我将给出另一个更严格地遵循给定约束的建议;在我的机器上,此实现在运行测试程序时始终捕获 97 条或更多“AB”消息,与原始代码相比,性能下降了约 5%:

class MessageWrapper
{
    object gate = new object();
    int? pendingB;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        Message returnMessage = null;
        bool lockTaken = false;

        Monitor.TryEnter(gate, 100, ref lockTaken);

        if (lockTaken)
        {
            returnMessage = new Message(a, pendingB);

            pendingB = null;
            Monitor.Pulse(gate);

            Monitor.Exit(gate);
        }
        else
        {
            returnMessage = new Message(a, null);
        }

        return returnMessage;
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        Message returnMessage = null;
        bool lockTaken = false;

        Monitor.TryEnter(gate, 100, ref lockTaken);

        if (lockTaken)
        {
            if (pendingB != null)
            {
                Monitor.Wait(gate, 100);
            }

            if (pendingB != null)
            {
                returnMessage = new Message(null, b);
            }
            else
            {
                pendingB = b;

                if (!Monitor.Wait(gate, millisecondsTimeout))
                {
                    pendingB = null;
                    Monitor.Pulse(gate);
                    returnMessage = new Message(null, b);
                }
            }

            Monitor.Exit(gate);
        }
        else
        {
            returnMessage = new Message(null, b);
        }

        return returnMessage;
    }
}

这里发生的事情与原始代码中的基本相同,但我们也在等待已经有一个pendingB对象,而不是仅仅返回一个“B”消息。这以很小的性能成本提高了我们可以找到的“AB”消息的数量。

它看起来有点乱,但这主要是因为我选择使用更实时友好的构造Monitor.TryTake而不是 raw lock。此外,使用单个return语句是一个巧妙的技巧,可以避免在调用Monitor.Exit之前意外返回死锁。

摆弄各种超时可以以准确性为代价提高性能,反之亦然。100ms 是我最初的猜测,至少在我的机器上看起来不错。


最后一点,在WrapB的这个实现中,我们可以更改行

            if (pendingB != null)
            {
                Monitor.Wait(gate, 100);
            }

            while (pendingB != null)
            {
                Monitor.Wait(gate, 100);
            }

获得 100% 的准确率,但它严重扰乱了测试程序的指标,因为它同步了“B”事件,当只有“B”消息流时,这显然表现极差。

如果我删除t3测试,它的运行速度比原始代码快约 5%,同时始终在 100 条“AB”消息中找到 100 条。但是,运行时当然不再是确定性的,因为我们无法确定我们将围绕循环旋转多少次。

编辑:

好吧,除非我们做类似的事情

            int spinCount = 0;

            while (pendingB != null && spinCount < 5)
            {
                spinCount++;
                Monitor.Wait(gate, 100);
            }

这将为我们提供等待时间的上限。当我们只有“B”消息流时,它确实解决了性能问题,并且运行时间与原始代码大致相同,同时始终在 100 条“AB”消息中找到 100 条。

于 2013-02-25T20:08:53.077 回答
1

似乎是Reactive Extesions的完美候选人。您可以使用Buffer方法对事件进行分组或其他类似的扩展来过滤和组合事件。

也许这个解决方案不符合您的限制之一,但在我看来这是最好的解决方案。反应式扩展非常强大。

于 2013-02-22T23:29:51.413 回答
0

Great problem. I really enjoyed spending some time on this. The solution that I used had 4 times the amount of matches that your original problem resulted with on my computer hardware.

Perhaps someone that is more knowledgeable than I am with Monitor and locks can improve this.

  1. Release another thread when a match is made instead of having that thread do a full sleep just to return null in the end. Perhaps this really is not that costly though. To solve this I introduced the AutoResetEvent, but for reasons that I do not understand, the AutoResetEvent is not acting as I intended and reduces the matches from 100 to 70.

  2. The final timeout of threads can be improved since once it times out it still needs to pass a contested lock.

It does fully fit the requirements:

  1. All processes will terminate within the specified period of time (The last lock may add a few cycles depending on how contested the lock is).
  2. Sending is outside of the locks.
  3. Synchronizes using gate
  4. No extra timers
  5. Preference and threads are treated equally

Original Questions Results:

  1. Time: 4.5 seconds
  2. A: 773
  3. B: 273
  4. AB: 27

This Classes Results:

  1. Time: 5.4 seconds
  2. A: 700
  3. B: 300
  4. AB: 100

    class MessageWrapper
    {
    object gate = new object();
    int EmptyThreadsToReleaseA = 0;
    int EmptyThreadsToReleaseB = 0;
    Queue<int> queueA = new Queue<int>();
    Queue<int> queueB = new Queue<int>();
    AutoResetEvent EmptyThreadEventA = new AutoResetEvent(false);
    AutoResetEvent EmptyThreadEventB = new AutoResetEvent(false);
    
    public Message WrapA(int a, int millisecondsTimeout)
    {
        lock (gate)
        {
            if (queueB.Count > 0)
            {
                Interlocked.Increment(ref EmptyThreadsToReleaseB);
                EmptyThreadEventB.Set();
                return new Message(a, queueB.Dequeue());
            }
            else
            {
                queueA.Enqueue(a);
            }
        }
    
        System.Threading.Thread.Sleep(millisecondsTimeout);
        //EmptyThreadEventA.WaitOne(millisecondsTimeout);
    
        lock (gate)
        {
            if (EmptyThreadsToReleaseA > 0)
            {
                Interlocked.Decrement(ref EmptyThreadsToReleaseA);
                return null;
            }
    
            return new Message(queueA.Dequeue(), null);
        }
    }
    
    public Message WrapB(int b, int millisecondsTimeout)
    {
        lock (gate)
        {
            if (queueA.Count > 0)
            {
                Interlocked.Increment(ref EmptyThreadsToReleaseA);
                EmptyThreadEventA.Set();
                return new Message(queueA.Dequeue(), b);
            }
            else
            {
                queueB.Enqueue(b);
            }
        }
    
        System.Threading.Thread.Sleep(millisecondsTimeout);
        //EmptyThreadEventB.WaitOne(millisecondsTimeout);
    
        lock (gate)
        {
            if (EmptyThreadsToReleaseB > 0)
            {
                Interlocked.Decrement(ref EmptyThreadsToReleaseB);
                return null;
            }
    
            return new Message(null, queueB.Dequeue());
        }
    }
    }
    
于 2013-02-24T01:08:38.417 回答
0

这是一种提高公平性的方法的草图——这意味着所有 B的发送都会受到长达 100 毫秒的延迟。但是我不知道它是否符合您的限制。

  • 在全局上下文中,有一个MessageSender类型的对象IMessageSender
  • 有两种实现IMessageSender,即DefaultMessageSender,和BWrappingMessageSender(存储一个b值)

消息发送者的行为如下:

  • DefaultMessageSender在被要求发送一个A: 只是发送它
  • DefaultMessageSender在被要求发送一个B: 将全局切换MessageSender为一个新的BWrappingMessageSender,它知道刚刚传递的值b

  • BWrappingMessageSender在被要求发送一个A: 发送一个带有传递的 ABa和它自己的 AB b,并将全局切换MessageSenderDefaultMessageSender

  • BWrappingMessageSender在被要求发送 a时B:发送B带有自己的a b,并将全局切换MessageSender为新的BWrappingMessageSender,它知道刚刚传递的值b

我没有确定的是一个新创建的BWrappingMessageSender知道在创建后发送一个简单的B100 毫秒的方式,如果它在那段时间没有被告知做任何其他事情。

于 2013-02-20T17:12:10.790 回答
0

经过一些实验,这是我的解决方案:

  • 如果单元素队列是空的,我们就占位。
  • 如果该位置已被占用,我们会礼貌地推动乘员继续前进,稍等片刻再试一次。
  • 如果有人粗鲁并在我们等待时劫持了现场,我们会插队继续前进。

代码:

Message WrapA(int a, int millisecondsTimeout)
{
    bool lockTaken = false;
    int? b = null;

    try
    {
        Monitor.TryEnter(gate, millisecondsTimeout, ref lockTaken);
        if (lockTaken)
        {
            if (pendingB != null)
            {
                b = pendingB;
                pendingB = null;
                Monitor.Pulse(gate);
            }
        }
    }
    finally
    {
        if (lockTaken)
        {
            Monitor.Exit(gate);
        }
    }

    return new Message(a, b);
}

Message WrapB(int b, int millisecondsTimeout)
{
    bool lockTaken = false;

    try
    {
        TimeoutHelper timeout = new TimeoutHelper(millisecondsTimeout);
        Monitor.TryEnter(gate, timeout.RemainingTime(), ref lockTaken);
        if (lockTaken)
        {
            if (pendingB == null)
            {
                pendingB = b;
                Monitor.Wait(gate, timeout.RemainingTime());
                if (pendingB == null) return null;
                pendingB = null;
            }
            else
            {
                Monitor.Pulse(gate);
                try { }
                finally { lockTaken = false; Monitor.Exit(gate); }
                Thread.Sleep(1);
                Monitor.TryEnter(gate, timeout.RemainingTime(), ref lockTaken);
                if (lockTaken)
                {
                    if (pendingB == null)
                    {
                        pendingB = b;
                        Monitor.Wait(gate, timeout.RemainingTime());
                        if (pendingB == null) return null;
                        pendingB = null;
                    }
                }
            }
        }
    }
    finally
    {
        if (lockTaken)
        {
            Monitor.Exit(gate);
        }
    }

    return new Message(null, b);
}
于 2013-02-20T19:52:04.440 回答
0

不确定它是否符合您的要求,但这是我的建议。它基本上尽可能将任何 B 消息传递给 A,并检查消息是否已发送:

class MessageWrapper
{
    object gate = new object();
    int? pendingB;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        int? b;

        lock (gate)
        {
            b = pendingB;
            pendingB = null;
            Thread.Sleep(1); // yield. 1 seems the best value after some testing
        }

        return new Message(a, b);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        int? bb = b;

        lock (gate)
        {
            if (pendingB == null)
            {
                pendingB = b;
                bb = null;
            }
        }

        Thread.Sleep(3);

        if (bb == null)
        {
            lock (gate)
            {
                if (pendingB != null)
                {
                    bb = pendingB;
                    pendingB = null;
                }
            }
        }
        return new Message(null, bb);
    }
}
于 2013-02-23T13:08:08.843 回答
0

好吧,我的第一个想法是拥有一个也处理优先级的信号量,但也许这篇文章会给你更多的洞察力.Net Mutex Question

基本上,想法是有一些方法来优先考虑这两种类型的事件,以便B如果没有接收到类型的事件,类型的事件可以尽可能快地运行A

我意识到这可能不是适合您的解决方案,因为您的第三个限制是除了 Gate 之外没有可用的同步机制,但我希望我可以为您指明正确的方向。

于 2013-02-20T15:44:23.287 回答
0

这是另一个尝试。该方法是等待事件的生成A附加到B事件,而不是等待B事件附加到A事件。

object gate = new object();
int? pendingA;

public Message WrapA(int a, int millisecondsTimeout)
{
    bool queued = false;

    lock (gate)
    {
        if (pendingA == null)
        {
            queued = true;
            pendingA = a;
            Monitor.Pulse(gate);
        }
    }

    if (queued)
    {
        Thread.Sleep(3);
        lock (gate)
        {
            if (pendingA == null)
                return null;

            a = pendingA.Value;
            pendingA = null;
        }
    }

    return new Message(a, null);
}

public Message WrapB(int b, int millisecondsTimeout)
{
    int? a;

    lock (gate)
    {
        if (pendingA == null)
            Monitor.Wait(gate, millisecondsTimeout);

        a = pendingA;
        pendingA = null;
    }

    return new Message(a, b);
}
于 2013-02-23T17:01:59.707 回答
0

我试图避免不必要的锁,尤其是对于 A 类型的事件。我还对包装类的逻辑进行了一些更改。我发现直接从这个类发送消息而不是只返回消息会更方便,这是因为在我的实现中,一次调用SendB可能会发送两条 B 消息。我在代码中添加了一些解释性注释

public class MessageWrapper
{
    private readonly object _gate = new object();
    private object _pendingB;

    public void SendA(int a, int millisecondsTimeout, Action<Message> send)
    {
        var b = Interlocked.Exchange<object>(ref _pendingB, null);

        send(new Message(a, (int?)b));

        // this code will just release any pending "assure that B was sent" threads.
        // but everything works fine even without it
        lock (_gate)
        {
            Monitor.PulseAll(_gate);
        }
    }

    public void SendB(int b, int millisecondsTimeout, Action<Message> send)
    {
        // needed for Interlocked to function properly and to be able to chack that exatly this b event was sent.
        var boxedB = (object)(int?)b;

        // excange currently pending B event with newly arrived one
        var enqueuedB = Interlocked.Exchange(ref _pendingB, boxedB);

        if (enqueuedB != null)
        {
            // if there was some pending B event then just send it.
            send(new Message(null, (int?)enqueuedB));
        }

        // now we have to wait up to millisecondsTimeout to ensure that our message B was sent
        lock (_gate)
        {
            // release any currently waiting threads.
            Monitor.PulseAll(_gate);

            if (Monitor.Wait(_gate, millisecondsTimeout))
            {
                // if we there pulsed, then we have nothing to do, as our event was already sent 
                return;
            }
        }

        // check whether our event is still pending 
        enqueuedB = Interlocked.CompareExchange(ref _pendingB, null, boxedB);

        if (ReferenceEquals(enqueuedB, boxedB))
        {
            // if so, then just send it.
            send(new Message(null, (int?)enqueuedB));
        }
    }
}

此外,我在您的测试类中进行了一些更改,这是我在评论中提到的一个原因 - 我在所有测试线程中添加了同步事件,以防我们测试 AB 子句。此外,我已将同时运行的线程数从您的版本中的 500 个减少到 20 个(全部用于 AB 子句)。所有这些线程中的仍然调用按线程数移动(在线程 Start 方法中作为参数传递),所以我希望 test 仍然非常相关。

public static class Program
{
    private static int _counter0 = 0;
    private static int _counterA = 0;
    private static int _counterB = 0;
    private static int _counterAb = 0;
    private static object _lastA;
    private static object _lastB;

    private static object _firstA;
    private static object _firstB;

    public static void Main(string[] args)
    {
        var wrapper = new MessageWrapper();
        var sw = Stopwatch.StartNew();

        var threadsCount = 10;
        var a0called = 40;

        // Only A events
        var t0 = Start(threadsCount, a0called, 7, 1000, wrapper.SendA);
        Join(t0);

        var aJointCalled = 40;
        var bJointCalled = 10;

        var syncEvent = new CountdownEvent(threadsCount + threadsCount);
        _firstA = null;
        _firstB = null;
        // A and B events
        var t1 = Start(threadsCount, aJointCalled, 7, 1000, wrapper.SendA, syncEvent);
        var t2 = Start(threadsCount, bJointCalled, 19, 1000, wrapper.SendB, syncEvent);
        Join(t1);
        Join(t2);
        var lastA = _lastA;
        var lastB = _lastB;

        var b0called = 20;

        // Only B events
        var t3 = Start(threadsCount, b0called, 7, 1000, wrapper.SendB);
        Join(t3);

        Console.WriteLine(sw.Elapsed);

        Console.WriteLine("0:  {0}", _counter0);
        Console.WriteLine("A:  {0}", _counterA);
        Console.WriteLine("B:  {0}", _counterB);
        Console.WriteLine("AB: {0}", _counterAb);

        Console.WriteLine(
            "Generated A: {0}, Sent A: {1}",
            (threadsCount * a0called) + (threadsCount * aJointCalled),
            _counterA + _counterAb);
        Console.WriteLine(
            "Generated B: {0}, Sent B: {1}",
            (threadsCount * bJointCalled) + (threadsCount * b0called),
            _counterB + _counterAb);

        Console.WriteLine("First A was sent on {0: MM:hh:ss ffff}", _firstA);
        Console.WriteLine("Last A was sent on {0: MM:hh:ss ffff}", lastA);
        Console.WriteLine("First B was sent on {0: MM:hh:ss ffff}", _firstB);
        Console.WriteLine("Last B was sent on {0: MM:hh:ss ffff}", lastB);

        Console.ReadLine();
    }

    private static void SendMessage(Message m)
    {
        if (m != null)
        {
            if (m.A != null)
            {
                if (m.B != null)
                {
                    Interlocked.Increment(ref _counterAb);
                }
                else
                {
                    Interlocked.Increment(ref _counterA);
                    Interlocked.Exchange(ref _lastA, DateTime.Now);
                    Interlocked.CompareExchange(ref _firstA, DateTime.Now, null);
                }
            }
            else if (m.B != null)
            {
                Interlocked.Increment(ref _counterB);
                Interlocked.Exchange(ref _lastB, DateTime.Now);
                Interlocked.CompareExchange(ref _firstB, DateTime.Now, null);
            }
            else
            {
                Interlocked.Increment(ref _counter0);
            }
        }
    }

    private static Thread[] Start(
        int threadCount, 
        int eventCount, 
        int eventInterval, 
        int wrapTimeout, 
        Action<int, int, Action<Message>> wrap,
        CountdownEvent syncEvent = null)
    {
        var threads = new Thread[threadCount];
        for (int i = 0; i < threadCount; i++)
        {
            threads[i] = new Thread(
                (p) =>
                    {
                        if (syncEvent != null)
                        {
                            syncEvent.Signal();
                            syncEvent.Wait();
                        }

                        Thread.Sleep((int)p);

                        for (int j = 0; j < eventCount; j++)
                        {
                            int k = (((int)p) * 1000) + j;
                            Thread.Sleep(eventInterval);
                            wrap(k, wrapTimeout, SendMessage);
                        }
                    });

            threads[i].Start(i);
        }

        return threads;
    }

    private static void Join(params Thread[] threads)
    {
        foreach (Thread t in threads)
        {
            t.Join();
        }
    }
}

PS 此外,感谢您提出非常有趣的问题。

于 2013-02-26T14:21:10.343 回答
0

对此的限制因素实际上是约束,特别是仅gate用于同步的要求以及无法生成任何其他计时器/线程/任务等。这最终将编程解决方案与使用Monitor对象联系在一起。例如,Christoffer 的解决方案虽然优雅,但在技术上使用同步,而不是gate封装在BlockingCollection. afrischke 之前列出的另一个非常创新的解决方案也使用了除gate.

经过大量的实验、阅读和研究,我不得不说我认为这个问题没有更好(更快?)的解决方案可以完全满足约束条件。我设法使用以下机制获得了边际性能增益。它并不漂亮,但它符合要求,至少在我的机器上平均快 1-5%;

object gate = new object();
ConcurrentDictionary<Guid, int> _bBag = new ConcurrentDictionary<Guid, int>();

public Message WrapA(int a, int millisecondsTimeout)
{
    Message message = null;
    int? b = null;
    lock (gate)
    {
        if (!_bBag.IsEmpty)
        {
            Guid key = _bBag.Keys.FirstOrDefault();
            int gotB = 0;
            if (_bBag.TryRemove(key, out gotB))
            {
                b = gotB;
                Monitor.PulseAll(gate);
            }
        }
    }

    message = new Message(a, b);
    return message;
}

public Message WrapB(int b, int millisecondsTimeout)
{
    Guid key = Guid.NewGuid();
    _bBag.TryAdd(key, b);
    lock (gate) { Monitor.Wait(gate, millisecondsTimeout); }
    int storedB = 0;
    if (_bBag.TryRemove(key, out storedB))
    {
        return new Message(null, b);
    }
    return null;    
}

放宽gate要求又会提高一点速度,尤其是在不运行在调试模式下时;

object gate = new object();
ManualResetEvent mre = new ManualResetEvent(false /*initialState*/);
ConcurrentDictionary<Guid, int> _bBag = new ConcurrentDictionary<Guid, int>();

public Message WrapA(int a, int millisecondsTimeout)
{
    Message message = null;
    int? b = null;
    lock (gate)
    {
        if (!_bBag.IsEmpty)
        {
            Guid key = _bBag.Keys.FirstOrDefault();
            int gotB = 0;
            if (_bBag.TryRemove(key, out gotB))
            {
                b = gotB;
                Monitor.PulseAll(gate);
            }
        }
    }

    message = new Message(a, b);
    return message;
}

public Message WrapB(int b, int millisecondsTimeout)
{
    Guid key = Guid.NewGuid();
    _bBag.TryAdd(key, b);
    mre.WaitOne(millisecondsTimeout);    // use a manual reset instead of Monitor
    int storedB = 0;
    if (_bBag.TryRemove(key, out storedB))
    {
        return new Message(null, b);
    }
    return null;
}

总而言之,考虑到严格的要求,我想说您已经有了一个非常精细的解决方案。我实际上希望我错了,有人找到更好的解决方案 - 这将非常有用!

于 2013-02-27T02:27:59.247 回答
0

经过三个小时的尝试,我设法得到了以下结果:

00:00:01.8577304
0:0
答:741
乙:241
乙:59
生成 A:800,发送 A:800
生成 B:300,发送 B:300
总计:1100

我的方法:

(1)每当有消息 B(现在称为 B)并且还没有 B 等待时,它会将其放入“队列”。如果在给定的超时时间内没有其他数据包,它将发送消息。(2)当队列中确实有一个 B 时,它会撞掉队列中的第一个 B 并发送此消息。这是为了确保公平。正在发送的新 B 将遵循与情况 1 相同的情况(它将被排队,并在给定的时间内发送)。(3)当有消息A(现在称为A),并且没有待处理的B时,A会立即发送。不执行实际等待。(4)当发送 A 并且队列中有一个 B 时,它将从队列中“窃取”它。两条消息都被包装,并一起发送。因为 B 在另一个线程上等待发送,而 A 偷了它,我们需要一个空检查。A 将通知 B,但 B 注意到它没有要发送的内容。B 将返回空值。

要在代码中完成此操作:

public class MessageWrapper
{
    readonly object _gate = new object();
    int? _pendingB;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        int? currentB;

        lock (_gate)
        {
            currentB = _pendingB;
            _pendingB = null;

            Monitor.Pulse(_gate); // B stolen, get rid of waiting threads
        }

        return new Message(a, currentB);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        lock (_gate)
        {
            if (_pendingB != null)
            {
                var currentB = _pendingB;
                _pendingB = b;

                Monitor.Pulse(_gate); // release for fairness
                Monitor.Wait(_gate, millisecondsTimeout); // wait for fairness

                return new Message(null, currentB);
            }
            else
            {
                _pendingB = b;

                Monitor.Pulse(_gate); // release for fairness
                Monitor.Wait(_gate, millisecondsTimeout); // wait for A

                if (_pendingB == null) return null;

                var currentB = _pendingB;
                _pendingB = null;
                return new Message(null, currentB);
            }
        }
    }
}
于 2013-02-23T20:37:48.933 回答