3

在启用了超线程的 Windows7 四核上,我有一百个线程通过BlockingCollection<T>(全部使用默认构造函数初始化,因此在ConcurrentQueue<T>内部使用)进行通信。

所有线程每秒都会收到 10-100 条消息,除了每天只收到 4-20 条消息的线程

我的问题与最后一个消费者有关:大多数时候,它被阻止等待新消息,但是当一条消息准备好被消费时,它应该尽快处理,可能是立即处理。

问题是,当一条消息被添加到BlockingCollection该消费者的专用消息时,它会在几秒钟后被接收(Take() 在消息入队后的 3 到 12 秒内返回)。

我猜这个问题与windows如何安排这个线程有关。

我试图在没有任何改进的情况下增加这个消费者的 ThreadPriority。然后我尝试将其处理器亲和性设置为专用内核,并且我已更改所有其他线程的亲和性以使用其他内核,但仍然没有任何改进。

我该如何解决这个问题?实际问题是什么?

这是线程循环(commandsBlockingCollection):

while (!commands.IsCompleted)
{
    _log.Debug("Waiting for command.");

    command = commands.Take();

    _log.DebugFormat("Received: {0}", command);

    command.ApplyTo(target);

    _log.InfoFormat("Dispatched: {0}", command);
}

当没有消息待处理时,我是否应该直接使用ConcurrentQueue, 睡眠 50 毫秒(这是可以接受的延迟)?

请注意,
没有 CPU 使用超过 50%。

至于延迟:日志显示(大多数情况下)在“已调度:...”和“等待命令”之间经过了几秒钟。(又名 on commands.IsCompleted)和“等待命令”之间的其他一些。和“收到:...”(又名 on commands.Take()

所有“正常”的线程都是 I/O 绑定的,但它们是合作的:相信我,我无法改变它们的设计。但是它们工作得很好:唯一的错误行为是执行不同类型工作的低负载线程。我知道线程优先级是邪恶的,但我没有找到更好的解决方案。

(几乎)复制问题
的测试 这是在服务器上几乎复制问题的测试。“几乎”是因为测试对 CPU 施加压力(全部达到 100%),而在实际应用中所有 CPU 都低于 50%:

public class ThreadTest
{
    class TimedInt
    {
        public int Value;

        public DateTime Time;
    }
    [Test]
    public void BlockingCollection_consumedWithLowLoad_delaySomeTimes()
    {
        // arrange:
        int toComplete = 0;
        BlockingCollection<KeyValuePair<DateTime, TimedInt>> results = new BlockingCollection<KeyValuePair<DateTime, TimedInt>>();
        BlockingCollection<TimedInt> queue = new BlockingCollection<TimedInt>();
        Action<int> producer = a =>
        {
            int i = 1;
            int x = Convert.ToInt32(Math.Pow(a, 7));
            while (i < 200000000)
            {
                if (i % x == 0)
                {
                    queue.Add(new TimedInt { Time = DateTime.Now, Value = i });
                    Thread.SpinWait(100); // just to simulate a bit of actual work here
                    queue.Add(new TimedInt { Time = DateTime.Now, Value = i + 1 });
                }
                i++;
            }
            Interlocked.Decrement(ref toComplete);
        };
        Action consumer = () =>
        {
            Thread.CurrentThread.Priority = ThreadPriority.Highest; // Note that the consumer has an higher priority
            Thread.CurrentThread.Name = "Consumer";
            while (toComplete > 0)
            {
                TimedInt v;
                if (queue.TryTake(out v, 1000))
                {
                    DateTime now = DateTime.Now;
                    results.Add(new KeyValuePair<DateTime, TimedInt>(now, v));
                }
            }
        };

        // act:
        List<Thread> threads = new List<Thread>();
        threads.Add(new Thread(new ThreadStart(consumer)));
        for (int i = 0; i < 200; i++)
        {
            var t = new Thread(new ThreadStart(() => producer(7 + (i % 3))));
            t.Name = "Producer " + i.ToString();
            threads.Add(t);
            toComplete++;
        }
        threads.ForEach(t => t.Start());
        threads.ForEach(t => t.Join());

        // assert:
        Assert.AreEqual(0, results.Where(kvp => (kvp.Key - kvp.Value.Time).TotalMilliseconds > 1000).Count());
    }
}

有任何想法吗?

4

0 回答 0