1

在我花太长时间重新发明轮子之前,我想检查一下 .Net 中是否已经有一个类可以满足我的需求。

我想要的是有点像信号量(甚至可能像 CountdownEvent),但略有不同。

我有一个要求,我有不同数量的可用“资源”,并且我希望线程在可用资源为零时有效地等待。同时,另一个线程可以释放资源,这应该立即释放另一个等待线程。

这听起来很像信号量,但这并不是因为信号量(据我所知)在计算每个线程时将它们视为“资源”。

无论如何,这是我想要的第一个简单实现。它还没有处置、代码合同、错误处理、超时支持或取消支持,但它应该展示我想要的:

public sealed class ResourceCounter
{
    /// <summary>Create with the specified number of resources initially available.</summary>

    public ResourceCounter(int resourceCount)
    {
        _resourceCount = resourceCount;

        if (_resourceCount > 0)
        {
            _resourceAvailable.Set();
        }
    }

    /// <summary>Acquires a resource. Waits forever if necessary.</summary>

    public void Acquire()
    {
        while (true)
        {
            _resourceAvailable.Wait();

            lock (_lock)
            {
                if (_resourceCount > 0)
                {
                    if (--_resourceCount == 0)
                    {
                        _resourceAvailable.Reset();
                    }

                    return;
                }
            }
        }
    }

    /// <summary>Releases a resource.</summary>

    public void Release()
    {
        lock (_lock)
        {
            ++_resourceCount;
            _resourceAvailable.Set();
        }
    }

    private int _resourceCount;
    private readonly object _lock = new object(); 
    private readonly ManualResetEventSlim _resourceAvailable = new ManualResetEventSlim();
}

使用模式非常简单:

  1. 使用所需的初始资源计数(可以为零或更多)构造一个 ResourceCounter。

  2. 想要获取资源的线程调用 ResourceCounter.Acquire(),在资源可用且已获取之前不会返回。

  3. 想要释放资源的线程调用 ResourceCounter.Release(),它将释放资源并立即返回。

请注意,任何线程都可以释放资源;它不一定是获得资源的人。

我将它用作一些多线程管道代码的一部分,其中一个线程负责将工作项排队,几个线程正在处理工作项,另一个线程正在输出处理后的工作项。输出已处理工作项的线程必须对它们进行多路复用(因为处理线程可以以任何顺序输出已完成的项),并且我需要一种机制来阻止工作项在多路复用器等待延迟项时无休止地排队。

(有关这方面的一些背景信息,请参阅管道、多路复用和无界缓冲。)

无论如何,是否有任何可用的方法来做到这一点,还是我应该继续为此开发自己的课程?


[编辑]

如下所述,SemaphoreSlim 完全正确。我拒绝了它,因为我认为调用 Wait() 的线程必须是调用 Release() 的线程,但事实并非如此。这就是我在星期天编码所得到的……;)

4

2 回答 2

3

使用队列进行通信更容易构建多级管道架构。生产者线程将项目放入工作队列,一个或多个工作线程出列并处理项目,并将它们添加到输出队列。最后一个线程读取输出队列并输出数据。

在 .NET 中,这很容易通过BlockingCollection完成。

有关两阶段管道的示例,请参阅https://stackoverflow.com/a/5108487/56778 。添加另一个阶段很简单。

为了处理输出线程乱序的问题,我使用最小堆将输出队列设置为优先级队列。我的项目由一个连续的记录号标识,因此输出线程知道接下来要输出哪个记录​​号。它将等待AutoResetEvent一个项目被放置在队列中(工作进程将在项目入队时设置事件)。然后,输出线程将查看顶部项目以查看它是否与预期项目匹配。如果没有,它将再次等待事件。

这非常有效,因为它消除了第二个队列。该块在它所属的输出队列中。性能非常适合我的目的。将项目入队是一个 O(log n) 操作,但实际上n非常小。即使队列中有 100,000 个项目,与处理记录所需的时间相比,将项目排入队列所需的时间也微不足道。

您仍然可以使用BlockingCollection它。你只需要让一个二进制堆类实现这个IProducerConsumerCollection接口。我通过向我在A Generic BinaryHeap 类中发布的简单二进制堆类添加锁来做到这一点。然后,您可以将其中之一提供给BlockingCollection构造函数,如下所示:

BlockingCollection<MyRecord> = 
    new BlockingCollection<ConcurrentBinaryHeap<MyRecord>>(
    new ConcurrentBinaryHeap<MyRecord>, MaxQueueSize);

不过,这里有一个潜在的僵局。如果队列已满(即超过了您在初始化 时设置的最大值BlockingCollection),那么迟到的线程无法将项目入队并且所有工作都将完全停止。这在实践中从未发生在我身上,因为尽管我每条记录的处理时间各不相同,但它们并没有太大的变化。

如果这是一个问题,您可以增加队列大小(仅当您可以肯定地说您永远不会填满队列时才有效),或者如果队列已满,则为下一个预期要发布的项目提供备用渠道. 我完成了这项工作,但为了我的目的,增加队列大小更容易。

如果您有兴趣,我可以翻阅我的档案以找到该ConcurrentBinaryHeap课程。

于 2013-02-24T15:54:40.243 回答
1

线程相互通信的方式与资源或其锁定机制无关。没有什么能阻止您在同一个进程中使用临时消息系统(消息队列、事件或任何适合您需要的东西)传递信号量和资源。

于 2013-02-24T15:58:01.230 回答