11

我正在使用管道模式实现将消息消费者与生产者分离,以避免消费缓慢的问题。

如果消息处理阶段出现任何异常,[1]它将丢失并且不会分派到其他服务/层[2]。我该如何处理这样的问题,[3]这样消息才不会丢失,什么是重要的!消息的顺序不会混淆,因此上层服务/层将按照它们进入的顺序获取消息。我有一个想法,它涉及另一个中间体Queue,但它看起来很复杂?不幸的是BlockingCollection<T>,没有公开任何类似的Queue.Peek()方法,所以我可以阅读下一条可用消息,并且在成功处理的情况下Dequeue()

private BlockingCollection<IMessage> messagesQueue;    

// TPL Task does following:
// Listen to new messages and as soon as any comes in - process it
foreach (var cachedMessage in 
             messagesQueue.GetConsumingEnumerable(cancellation))
{    
    const int maxRetries = 3;
    int retriesCounter = 0;
    bool isSent = false;

    // On this point a message already is removed from messagesQueue
    while (!isSent && retriesCounter++ <= maxRetries)
    {
        try
        {
           // [1] Preprocess a message
           // [2] Dispatch to an other service/layer    
           clientProxyCallback.SendMessage(cachedMessage);
           isSent = true;
        }                                
        catch(Exception exception)
        {
           // [3]   
           // logging
           if (!isSent && retriesCounter < maxRetries)
           {
              Thread.Sleep(NSeconds);
           }
        }            
    
        if (!isSent && retriesCounter == maxRetries)
        {
           // just log, message is lost on this stage!
        }
    }
}

编辑:忘了说这是 IIS 托管的 WCF 服务,它通过客户端回调合同将消息发送回 Silverlight 客户端 WCF 代理。

EDIT2:下面是我将如何使用Peek(),我错过了什么吗?

bool successfullySent = true;
try
{
   var item = queue.Peek();
   PreProcessItem(item);
   SendItem(item);       
}
catch(Exception exception)
{
   successfullySent = false;
}
finally
{
   if (successfullySent)
   {
       // just remove already sent item from the queue
       queue.Dequeue();
   }
}

EDIT3:我当然可以使用 while 循环、布尔标志Queue和的旧式方法AutoResetEvent,但我只是想知道是否可以使用相同的方法BlockingCollection,并且 GetConsumingEnumerable()我认为Peek在与消耗可枚举一起使用时,类似设施会非常有帮助,因为否则所有管道模式实现例如新的东西BlockingCollectionGetConsumingEnumerable()看起来不耐用,我必须回到旧的方法。

4

4 回答 4

9

您应该考虑中间队列。

BlockingCollection<T>由于其性质,不能“偷看”物品 - 可以有多个消费者。其中一个可以偷看一个项目,另一个可以拿走它 - 因此,第一个将尝试拿走已经被拿走的项目。

于 2012-11-27T11:03:32.237 回答
4

正如丹尼斯在他的评论中所说,为接口BlockingCollection<T>的任何实现者提供了一个阻塞包装器IProducerConsumerCollection<T>

如您所见,IProducerConsumerCollection<T>根据设计,没有定义Peek<T>实现方法所需的 a 或其他方法。这意味着,就目前而言BlockingCollection<T>,它不能提供对Peek.

如果您考虑一下,这将大大减少由实现的效用权衡产生的并发问题Peek。不消费怎么能消费?同时,您Peek必须锁定集合的头部,直到Peek操作完成,我和设计者BlockingCollection<T>认为这是次优的。我认为它也会很混乱且难以实施,需要某种一次性的 peek 上下文。

如果您消费一条消息并且它的消费失败,您将不得不处理它。您可以将其添加到另一个失败队列中,将其重新添加到正常处理队列中以供将来重试,或者仅记录其失败以供后代使用,或者其他适合您的上下文的操作。

如果您不想同时消费消息,则无需使用BlockingCollection<T>,因为您不需要并发消费。您可以ConcurrentQueue<T>直接使用,您仍然可以获得添加的同步性,并且您可以TryPeek<T>安全地使用,因为您控制单个消费者。如果消费失败,您可以根据需要通过无限重试循环来停止消费,但我建议这需要一些设计思想。

于 2012-11-27T11:31:11.653 回答
4

BlockingCollection<T>是 的包装器IProducerConsumerCollection<T>,它比 eg 更通用,ConcurrentQueue并为实现者提供了不必实现(Try)Peek方法的自由。

但是,您始终可以TryPeek直接调用底层队列:

ConcurrentQueue<T> useOnlyForPeeking = new ConcurrentQueue<T>();
BlockingCollection<T> blockingCollection = new BlockingCollection<T>(useOnlyForPeeking);
...
useOnlyForPeeking.TryPeek(...)

但是请注意,您不能通过 修改队列useOnlyForPeeking,否则blockingCollection会感到困惑并可能会向您抛出InvalidOperationExceptions,但如果在此并发数据结构上调用非修改TryPeek会是一个问题,我会感到惊讶。

于 2014-06-06T03:15:45.807 回答
0

你可以ConcurrentQueue<T>改用,它有TryDequeue()方法。

ConcurrentQueue<T>.TryDequeue(out T result)尝试移除并返回并发队列开头的对象,如果一个元素被移除并成功从 ConcurrentQueue 的开头返回,则返回 true。

因此,无需先检查 Peek。

TryDequeue()是线程安全的:

ConcurrentQueue<T> 在内部处理所有同步。如果两个线程同时调用 TryDequeue(T),则不会阻塞任何操作。

据我了解,仅当队列为空时它才返回 false

如果队列中填充了诸如 q.Enqueue("a"); 之类的代码。q.Enqueue("b"); q.Enqueue("c"); 并且两个线程同时尝试使一个元素出队,一个线程将 a 出队,另一个线程将 b 出队。对 TryDequeue(T) 的两次调用都将返回 true,因为它们都能够使元素出队。如果每个线程返回一个额外的元素出队,其中一个线程将出列 c 并返回 true,而另一个线程将发现队列为空并返回 false。

http://msdn.microsoft.com/en-us/library/dd287208%28v=vs.100%29.aspx

更新

也许,最简单的选择是使用TaskSchedulerClass。有了它,您可以将所有处理任务包装到队列的项目中,并简化同步的实现。

于 2012-11-27T10:56:06.030 回答