在启用了超线程的 Windows7 四核上,我有一百个线程通过BlockingCollection<T>
(全部使用默认构造函数初始化,因此在ConcurrentQueue<T>
内部使用)进行通信。
所有线程每秒都会收到 10-100 条消息,除了每天只收到 4-20 条消息的线程。
我的问题与最后一个消费者有关:大多数时候,它被阻止等待新消息,但是当一条消息准备好被消费时,它应该尽快处理,可能是立即处理。
问题是,当一条消息被添加到BlockingCollection
该消费者的专用消息时,它会在几秒钟后被接收(Take() 在消息入队后的 3 到 12 秒内返回)。
我猜这个问题与windows如何安排这个线程有关。
我试图在没有任何改进的情况下增加这个消费者的 ThreadPriority。然后我尝试将其处理器亲和性设置为专用内核,并且我已更改所有其他线程的亲和性以使用其他内核,但仍然没有任何改进。
我该如何解决这个问题?实际问题是什么?
这是线程循环(commands
是BlockingCollection
):
while (!commands.IsCompleted)
{
_log.Debug("Waiting for command.");
command = commands.Take();
_log.DebugFormat("Received: {0}", command);
command.ApplyTo(target);
_log.InfoFormat("Dispatched: {0}", command);
}
当没有消息待处理时,我是否应该直接使用ConcurrentQueue
, 睡眠 50 毫秒(这是可以接受的延迟)?
请注意,
没有 CPU 使用超过 50%。
至于延迟:日志显示(大多数情况下)在“已调度:...”和“等待命令”之间经过了几秒钟。(又名 on commands.IsCompleted
)和“等待命令”之间的其他一些。和“收到:...”(又名 on commands.Take()
)
所有“正常”的线程都是 I/O 绑定的,但它们是合作的:相信我,我无法改变它们的设计。但是它们工作得很好:唯一的错误行为是执行不同类型工作的低负载线程。我知道线程优先级是邪恶的,但我没有找到更好的解决方案。
(几乎)复制问题
的测试 这是在服务器上几乎复制问题的测试。“几乎”是因为测试对 CPU 施加压力(全部达到 100%),而在实际应用中所有 CPU 都低于 50%:
public class ThreadTest
{
class TimedInt
{
public int Value;
public DateTime Time;
}
[Test]
public void BlockingCollection_consumedWithLowLoad_delaySomeTimes()
{
// arrange:
int toComplete = 0;
BlockingCollection<KeyValuePair<DateTime, TimedInt>> results = new BlockingCollection<KeyValuePair<DateTime, TimedInt>>();
BlockingCollection<TimedInt> queue = new BlockingCollection<TimedInt>();
Action<int> producer = a =>
{
int i = 1;
int x = Convert.ToInt32(Math.Pow(a, 7));
while (i < 200000000)
{
if (i % x == 0)
{
queue.Add(new TimedInt { Time = DateTime.Now, Value = i });
Thread.SpinWait(100); // just to simulate a bit of actual work here
queue.Add(new TimedInt { Time = DateTime.Now, Value = i + 1 });
}
i++;
}
Interlocked.Decrement(ref toComplete);
};
Action consumer = () =>
{
Thread.CurrentThread.Priority = ThreadPriority.Highest; // Note that the consumer has an higher priority
Thread.CurrentThread.Name = "Consumer";
while (toComplete > 0)
{
TimedInt v;
if (queue.TryTake(out v, 1000))
{
DateTime now = DateTime.Now;
results.Add(new KeyValuePair<DateTime, TimedInt>(now, v));
}
}
};
// act:
List<Thread> threads = new List<Thread>();
threads.Add(new Thread(new ThreadStart(consumer)));
for (int i = 0; i < 200; i++)
{
var t = new Thread(new ThreadStart(() => producer(7 + (i % 3))));
t.Name = "Producer " + i.ToString();
threads.Add(t);
toComplete++;
}
threads.ForEach(t => t.Start());
threads.ForEach(t => t.Join());
// assert:
Assert.AreEqual(0, results.Where(kvp => (kvp.Key - kvp.Value.Time).TotalMilliseconds > 1000).Count());
}
}
有任何想法吗?