0

在四核系统中运行此程序将消耗 25% 的 CPU 功率。所以基本上有些东西正在全速运行。我将其缩小到消费者,但是负载不会在按下“x”时停止,这应该终止我的消费者。

我的代码

internal class TestBlockingCollectionConsumerProducer2
{
    private int _itemCount;

    internal void Run()
    {
        BlockingCollection<string> blockingCollection = new BlockingCollection<string>();

        // The token source for issuing the cancelation request.
        CancellationTokenSource cts = new CancellationTokenSource();

        // Simple thread waiting for a Console 'x'
        Task.Factory.StartNew(() =>
        {
            if (Console.ReadKey().KeyChar == 'x')
            {
                cts.Cancel();
            }
        });

        // start producer
        Task.Factory.StartNew(() => Produce(blockingCollection, cts.Token));

        // start multiple consumers
        const int THREAD_COUNT = 5;
        for (int i = 0; i < THREAD_COUNT; i++)
        {
            Task.Factory.StartNew(() => Consume(blockingCollection, cts.Token));
        }

        while (true);
    }

    private void Produce(BlockingCollection<string> blockingCollection, CancellationToken cancellationToken)
    {
        while (true)
        {
            for (int i = 0; i < 10; i++)
            {
                blockingCollection.Add(string.Format("Item {0}", _itemCount++), cancellationToken);
            }

            Console.WriteLine("Added 10 items. Current queue length:" + blockingCollection.Count);
            Thread.Sleep(10000);
        }
    }

    private void Consume(BlockingCollection<string> blockingCollection, CancellationToken cancellationToken)
    {
        try
        {
            foreach (string item in blockingCollection.GetConsumingEnumerable(cancellationToken))
            {
                Console.WriteLine(string.Format("[{0}] Consumer: Consuming: {1}", Thread.CurrentThread.ManagedThreadId, item));
                Thread.Sleep(2500);
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("[{0}] Consumer: Operation has been canceled.", Thread.CurrentThread.ManagedThreadId);
        }
    }
}

我的问题是:
1 、为什么CPU负载这么高?GetConsumingEnumerable() 不应该阻塞,因此根本不使用 CPU 时间吗?
2.为什么在 cts.Cancel() 上没有停止?

4

1 回答 1

5

问题不在于BlockingCollection.

它是无限循环while (true);。这是在Run方法中做什么?那就是烧你的CPU。

我看到Produce方法不尊重CancellationToken. 而不是无限循环,您应该使用while (!cancellationToken.IsCancellationRequested).

另外,因为cts.Cancel它确实取消了操作。如果由于某种原因这不起作用,请提供一个小而完整的程序来重现问题。

于 2015-04-23T09:43:56.820 回答