6

我有一个生产者/消费者队列,除了有特定类型的对象。因此,并非任何消费者都可以消费添加的对象。我不想为每种类型创建一个特定的队列,因为太多了。(它有点延伸了生产者/消费者的定义,但我不确定正确的术语是什么。)

是否存在允许带有参数的脉冲的 EventWaitHandle 之类的东西?例如myHandle.Set(AddedType = "foo")。现在我正在使用Monitor.Wait,然后每个消费者都会检查脉冲是否真的是为他们准备的,但这似乎毫无意义。

我现在拥有的伪代码版本:

class MyWorker {
    public string MyType {get; set;}
    public static Dictionary<string, MyInfo> data;

    public static void DoWork(){
        while(true){
             if(Monitor.Wait(data, timeout)){
                   if (data.ContainsKey(MyType)){
                        // OK, do work
                   }
             }
        }
    }
}

正如你所看到的,当其他东西被添加到字典中时,我可能会得到脉冲。我只关心何时将 MyType 添加到字典中。有没有办法做到这一点?这不是什么大不了的事,但是,例如,我现在必须手动处理超时,因为每次获取锁都可以在超时内成功,但MyType永远不会添加到timeout.

4

2 回答 2

3

这是个有趣的问题。听起来解决方案的关键是优先队列的阻塞变体。Java 有PriorityBlockingQueue.NET BCL 的等价物,但不幸的是不存在。然而,一旦你有了一个,实施就很容易了。

class MyWorker 
{
    public string MyType {get; set;}
    public static PriorityBlockingQueue<string, MyInfo> data; 

    public static void DoWork()
    {
        while(true)
        {
            MyInfo value;
            if (data.TryTake(MyType, timeout, out value))
            {
                // OK, do work
            }
        }
    }
}

实现 aPriorityBlockingQueue并不是很困难。BlockingCollection遵循与利用Add和样式方法相同的模式,Take我提出了以下代码。

public class PriorityBlockingQueue<TKey, TValue>
{
    private SortedDictionary<TKey, TValue> m_Dictionary = new SortedDictionary<TKey,TValue>();

    public void Add(TKey key, TValue value)
    {
        lock (m_Dictionary)
        {
            m_Dictionary.Add(key, value);
            Monitor.Pulse(m_Dictionary);
        }
    }

    public TValue Take(TKey key)
    {
        TValue value;
        TryTake(key, TimeSpan.FromTicks(long.MaxValue), out value);
        return value;
    }

    public bool TryTake(TKey key, TimeSpan timeout, out TValue value)
    {
        value = default(TValue);
        DateTime initial = DateTime.UtcNow;
        lock (m_Dictionary)
        {
            while (!m_Dictionary.TryGetValue(key, out value))
            {
                if (m_Dictionary.Count > 0) Monitor.Pulse(m_Dictionary); // Important!
                TimeSpan span = timeout - (DateTime.UtcNow - initial);
                if (!Monitor.Wait(m_Dictionary, span))
                {
                    return false;
                }
            }
            m_Dictionary.Remove(key);
            return true;
        }
    }
}

这是一个快速的实现,它有几个问题。首先,我根本没有测试过它。其次,它使用红黑树(via SortedDictionary)作为底层数据结构。这意味着该TryTake方法将具有 O(log(n)) 复杂度。优先级队列通常具有 O(1) 的移除复杂度。优先级队列选择的典型数据结构是,但我发现跳过列表实际上在实践中更好,原因有几个。这些都不存在于 .NET BCL 中,这就是为什么我使用 aSortedDictionary代替,尽管它在这种情况下性能较差。

我应该在这里指出,这实际上并不能解决毫无意义的Wait/Pulse行为。它被简单地封装在PriorityBlockingQueue类中。但是,至少这肯定会清理代码的核心部分。

它看起来不像您的代码每个键处理多个对象,但是在添加到字典时使用 aQueue<MyInfo>而不是普通的旧的很容易添加。MyInfo

于 2010-12-16T20:17:11.387 回答
1

似乎您想将生产者/消费者队列与观察者模式结合起来——通用消费者线程或线程从队列中读取,然后将事件传递给所需的代码。在这种情况下,您实际上不会向观察者发出信号,而只是在消费者线程识别出谁对给定工作项感兴趣时调用它。

.Net 中的观察者模式通常使用 C# 事件来实现。您只需要调用对象的事件处理程序,就会通过它调用一个或多个观察者。目标代码首先必须通过将自己添加到事件中以在工作到达时向观察对象注册自己。

于 2010-12-16T16:38:07.073 回答