我有一堆线程会生成 typeA
和 type的事件B
。
我的程序接收这些事件,将它们包装在一条消息中并通过网络发送它们。消息可以包含一个A
事件、一个B
事件或一个A
事件和一个B
事件:
SendMessage(new Message(a: 1, b: null));
SendMessage(new Message(a: null, b: 2 ));
SendMessage(new Message(a: 3, b: 4 ));
类型A
事件发生得相当频繁,而类型事件发生的频率B
要低得多。因此,当一个线程生成一个B
事件时,我的程序会稍等片刻,看看另一个线程是否生成了一个事件,并在可能的情况下将事件和A
事件结合起来。A
B
这是我的代码:
object gate = new object();
int? pendingB;
Message WrapA(int a, int millisecondsTimeout)
{
int? b;
lock (gate)
{
b = pendingB;
pendingB = null;
Monitor.Pulse(gate);
}
return new Message(a, b);
}
Message WrapB(int b, int millisecondsTimeout)
{
lock (gate)
{
if (pendingB == null)
{
pendingB = b;
Monitor.Wait(gate, millisecondsTimeout);
if (pendingB != b) return null;
pendingB = null;
}
}
return new Message(null, b);
}
到目前为止,这有效。但是,有两个问题:
如果有很多
A
事件和很多事件,该算法不是很有效:即使有足够的事件,也B
只有一定百分比的B
事件附加到事件。A
A
如果有
A
一段时间没有事件生成(不常见,但并非不可能),该算法是完全不公平的:一个线程生成B
事件每次都必须等待,而所有其他线程可以立即发送它们的B
事件。
如何提高算法的效率和公平性?
约束:
• WrapA
并且WrapB
必须在很短的、确定的时间内终止。
• SendMessage
必须在任何锁之外调用。
• 除了 之外,没有可用的同步机制gate
。
• 没有额外的线程、任务、计时器等可用。
• 由于A
在正常情况下此类事件发生得如此频繁,因此忙于等待WrapB
是可以的。
这是一个可以用作基准测试的测试程序:
public static class Program
{
static int counter0 = 0;
static int counterA = 0;
static int counterB = 0;
static int counterAB = 0;
static void SendMessage(Message m)
{
if (m != null)
if (m.a != null)
if (m.b != null)
Interlocked.Increment(ref counterAB);
else
Interlocked.Increment(ref counterA);
else
if (m.b != null)
Interlocked.Increment(ref counterB);
else
Interlocked.Increment(ref counter0);
}
static Thread[] Start(int threadCount, int eventCount,
int eventInterval, int wrapTimeout, Func<int, int, Message> wrap)
{
Thread[] threads = new Thread[threadCount * eventCount];
for (int i = 0; i < threadCount; i++)
{
for (int j = 0; j < eventCount; j++)
{
int k = i * 1000 + j;
int l = j * eventInterval + i;
threads[i * eventCount + j] = new Thread(() =>
{
Thread.Sleep(l);
SendMessage(wrap(k, wrapTimeout));
});
threads[i * eventCount + j].Start();
}
}
return threads;
}
static void Join(params Thread[] threads)
{
for (int i = 0; i < threads.Length; i++)
{
threads[i].Join();
}
}
public static void Main(string[] args)
{
var wrapper = new MessageWrapper();
var sw = Stopwatch.StartNew();
// Only A events
var t0 = Start(10, 40, 7, 1000, wrapper.WrapA);
Join(t0);
// A and B events
var t1 = Start(10, 40, 7, 1000, wrapper.WrapA);
var t2 = Start(10, 10, 19, 1000, wrapper.WrapB);
Join(t1);
Join(t2);
// Only B events
var t3 = Start(10, 20, 7, 1000, wrapper.WrapB);
Join(t3);
Console.WriteLine(sw.Elapsed);
Console.WriteLine("0: {0}", counter0);
Console.WriteLine("A: {0}", counterA);
Console.WriteLine("B: {0}", counterB);
Console.WriteLine("AB: {0}", counterAB);
Console.WriteLine("Generated A: {0}, Sent A: {1}",
10 * 40 + 10 * 40, counterA + counterAB);
Console.WriteLine("Generated B: {0}, Sent B: {1}",
10 * 10 + 10 * 20, counterB + counterAB);
}
}