2

在 Java 中,您可以将多个Condition对象关联到一个ReentrantLock. C# 等价物是什么?

真实示例:JavaCondition文档中的示例实现使用了两个Condition对象,notFullnotEmpty,绑定到同一个锁。该示例如何转换为 C#?

背景:我经常发现 Java 代码使用两个Condition对象来表示各种状态,关联到同一个Lock;在 C# 中,您似乎可以

  • 调用Monitor.Enter一个对象,然后调用Monitor.WaitOne/ Monitor.Pulse,但这只是一个条件。
  • 使用多个Auto/ManualResetEvent对象,但这些对象在等待后不能原子地重新获取给定的锁。

注意:我可以想到一种方法:在单个对象上使用Monitor.WaitOne/ Monitor.PulseAll,并在唤醒后检查条件;这也是您在 Java 中所做的,以防止虚假唤醒。但是,它并没有真正起作用,因为它会强制您调用PulseAll而不是调用Pulse,因为Pulse可能会唤醒等待另一个条件的线程。不幸的是,使用PulseAll代替Pulse会影响性能(线程竞争相同的锁)。

4

3 回答 3

1

我认为,如果您正在进行新的开发并且可以使用 .NET 4 或更高版本,那么新的并发集合类(例如ConcurrentQueue )将为您提供更好的服务。

但是,如果您不能采取行动,并且严格回答您的问题,在 .NET 中,恕我直言,这有点简化,要实现 prod/cons 模式,您只需等待然后像下面那样脉冲(请注意,我在记事本)

// max is 1000 items in queue
private int _count = 1000;

private Queue<string> _myQueue = new Queue<string>();

private static object _door = new object();

public void AddItem(string someItem)
{
    lock (_door)
    {
        while (_myQueue.Count == _count)
        {
            // reached max item, let's wait 'till there is room
            Monitor.Wait(_door);
        }

        _myQueue.Enqueue(someItem);
        // signal so if there are therads waiting for items to be inserted are waken up
        // one at a time, so they don't try to dequeue items that are not there
        Monitor.Pulse(_door);
    }
}

public string RemoveItem()
{
    string item = null;

    lock (_door)
    {
        while (_myQueue.Count == 0)
        {
            // no items in queue, wait 'till there are items
            Monitor.Wait(_door);
        }

        item = _myQueue.Dequeue();
        // signal we've taken something out
        // so if there are threads waiting, will be waken up one at a time so we don't overfill our queue
        Monitor.Pulse(_door);
    }

    return item;
}

更新: 为了消除任何混乱,请注意Monitor.Wait释放锁,因此您不会遇到死锁

于 2013-10-23T17:57:14.667 回答
0

@Jason如果队列已满并且您只唤醒一个线程,则不能保证该线程是消费者。它可能是一个生产者,你被卡住了。

于 2013-10-23T18:38:27.780 回答
0

我没有遇到太多想要在锁中共享状态的 C# 代码。不用自己动手,你可以使用 a SemaphoreSlim(但我推荐ConcurrentQueue(T)or BlockingCollection(T))。

public class BoundedBuffer<T>
{
    private readonly SemaphoreSlim _locker = new SemaphoreSlim(1,1);
    private readonly int _maxCount = 1000;
    private readonly Queue<T> _items;

    public int Count { get { return _items.Count; } }

    public BoundedBuffer()
    {
        _items = new Queue<T>(_maxCount);
    }

    public BoundedBuffer(int maxCount)
    {
        _maxCount = maxCount;
        _items = new Queue<T>(_maxCount);
    }

    public void Put(T item, CancellationToken token)
    {
        _locker.Wait(token);

        try
        {
            while(_maxCount == _items.Count)
            {
                _locker.Release();
                Thread.SpinWait(1000);
                _locker.Wait(token);
            }

            _items.Enqueue(item);
        }
        catch(OperationCanceledException)
        {
            try
            {
                _locker.Release();
            }
            catch(SemaphoreFullException) { }

            throw;
        }
        finally
        {
            if(!token.IsCancellationRequested)
            {
                _locker.Release();
            }
        }
    }

    public T Take(CancellationToken token)
    {
        _locker.Wait(token);

        try
        {
            while(0 == _items.Count)
            {
                _locker.Release();
                Thread.SpinWait(1000);
                _locker.Wait(token);
            }

            return _items.Dequeue();
        }
        catch(OperationCanceledException)
        {
            try
            {
                _locker.Release();
            }
            catch(SemaphoreFullException) { }

            throw;
        }
        finally
        {
            if(!token.IsCancellationRequested)
            {
                _locker.Release();
            }
        }
    }
}
于 2013-10-23T19:21:07.990 回答