8

我正在玩一个简单的控制台应用程序,它创建一个线程,并在主线程和工作线程之间进行一些线程间通信。

我正在将对象从主线程发布到并发队列,并且工作线程正在将其出列并进行一些处理。

让我感到奇怪的是,当我分析这个应用程序时,即使我有两个内核。一个核心是 100% 空闲的,而另一个核心已经完成了所有工作,我看到两个线程都在该核心中运行。 在此处输入图像描述

为什么是这样?

是因为我使用了一个等待句柄,它在我发布消息时设置并在处理完成时释放?

这是我的示例代码,现在使用 2 个工作线程。它的行为仍然相同,main、worker1 和 worker2 在同一个内核中运行。想法?

[编辑] 它现在有点用,至少,与昨天相比,我的性能提高了两倍。诀窍是减慢消费者的速度,以避免使用 AutoResetEvent 发出信号。

public class SingleThreadDispatcher
{
    public long Count;
    private readonly ConcurrentQueue<Action> _queue = new ConcurrentQueue<Action>();
    private volatile bool _hasMoreTasks;
    private volatile bool _running = true;
    private int _status;
    private readonly AutoResetEvent _signal = new AutoResetEvent(false);
    public SingleThreadDispatcher()
    {
        var thread = new Thread(Run)
        {
            IsBackground = true,
            Name = "worker" + Guid.NewGuid(),           
        };

        thread.Start();
    }

    private void Run()
    {
        while (_running)
        {

            _signal.WaitOne();
            do
            {
                _hasMoreTasks = false;

                Action task;
                while (_queue.TryDequeue(out task) && _running)
                {
                    Count ++;
                    task();
                }
                //wait a short while to let _hasMoreTasks to maybe be set to true
                //this avoids the roundtrip to the AutoResetEvent
                //that is, if there is intense pressure on the pool, we let some new
                //tasks have the chance to arrive and be processed w/o signaling
                if(!_hasMoreTasks)
                    Thread.Sleep(5);

                Interlocked.Exchange(ref _status, 0);
            } while (_hasMoreTasks);
        }
    }

    public void Schedule(Action task)
    {
        _hasMoreTasks = true;
        _queue.Enqueue(task);

        SetSignal();
    }

    private void SetSignal()
    {
        if (Interlocked.Exchange(ref _status, 1) == 0)
        {
            _signal.Set();
        }
    }
}
4

4 回答 4

7

是因为我使用了一个等待句柄,它在我发布消息时设置并在处理完成时释放?

如果没有看到您的代码,很难确定,但从您的描述来看,您编写的两个线程似乎充当协同程序:当主线程运行时,工作线程无事可做,反之亦然。看起来 .NET 调度程序足够聪明,不会在发生这种情况时加载第二个内核。

您可以通过多种方式更改此行为 - 例如

  • 通过在等待句柄之前在主线程上做一些工作,或者
  • 通过添加更多的工作线程来竞争你的主线程发布的任务,并且都可以获得一个任务来处理。
于 2014-01-13T15:40:58.770 回答
2

好的,我已经弄清楚问题所在了。在这种情况下,生产者和消费者几乎一样快。这导致消费者快速完成所有工作,然后循环返回等待 AutoResetEvent。下次生产者发送任务时,它必须触摸 AutoresetEvent 并设置它。

解决方案是在消费者中添加一个非常非常小的延迟,使其比生产者稍慢。这导致当生产者发送一个任务时,它会注意到消费者已经处于活动状态,它只需要发布到工作队列而不接触 AutoResetEvent。

最初的行为导致了一种乒乓效应,可以在屏幕截图中看到。

于 2014-01-14T10:35:03.587 回答
1

Dasblinkelight(可能)有正确的答案。

除此之外,当您的一个线程受 I/O 限制(也就是说,它没有卡在 CPU 上)时,这也是正确的行为 - 在这种情况下,您无法从使用多个内核中获得任何好处,并且.NET 足够聪明,只需在一个内核上更改上下文。

UI 线程通常就是这种情况——它要做的工作很少,所以通常没有太多理由让它自己占据整个内核。是的,如果您的并发队列没有正确使用,它可能只是意味着主线程等待工作线程 - 同样,在这种情况下,不需要切换内核,因为原始线程无论如何都在等待。

于 2014-01-13T15:45:21.420 回答
1

您应该使用BlockingCollection而不是ConcurrentQueue. 默认情况下,在引擎盖下BlockingCollection使用 a ConcurrentQueue,但它具有更易于使用的界面。特别是,它会进行非忙等待。另外,BlockingCollection支持取消,让你的消费变得非常简单。这是一个例子:

public class SingleThreadDispatcher
{
    public long Count;
    private readonly BlockingCollection<Action> _queue = new BlockingCollection<Action>();
    private readonly CancellationTokenSource _cancellation = new CancellationTokenSource();

    public SingleThreadDispatcher()
    {
        var thread = new Thread(Run)
        {
            IsBackground = true,
            Name = "worker" + Guid.NewGuid(),
        };

        thread.Start();
    }

    private void Run()
    {
        foreach (var task in _queue.GetConsumingEnumerable(_cancellation.Token))
        {
            Count++;
            task();
        }
    }

    public void Schedule(Action task)
    {
        _queue.Add(task);
    }
}

循环GetConsumingEnumerable将在队列上进行非忙等待。没有必要用一个单独的事件来做。它将等待将项目添加到队列中,或者如果您设置了取消令牌,它将退出。

要正常停止它,您只需调用_queue.CompleteAdding(). 这告诉消费者不会有更多的项目被添加到队列中。消费者将清空队列然后退出。

如果你想早点退出,那就打电话吧_cancellation.Cancel()。这将导致GetConsumingEnumerable退出。

通常,您不必ConcurrentQueue直接使用。BlockingCollection更易于使用并提供同等性能。

于 2014-01-14T17:33:26.020 回答