我正在使用管道模式实现将消息消费者与生产者分离,以避免消费缓慢的问题。
如果消息处理阶段出现任何异常,[1]
它将丢失并且不会分派到其他服务/层[2]
。我该如何处理这样的问题,[3]
这样消息才不会丢失,什么是重要的!消息的顺序不会混淆,因此上层服务/层将按照它们进入的顺序获取消息。我有一个想法,它涉及另一个中间体Queue
,但它看起来很复杂?不幸的是BlockingCollection<T>
,没有公开任何类似的Queue.Peek()
方法,所以我可以阅读下一条可用消息,并且在成功处理的情况下Dequeue()
private BlockingCollection<IMessage> messagesQueue;
// TPL Task does following:
// Listen to new messages and as soon as any comes in - process it
foreach (var cachedMessage in
messagesQueue.GetConsumingEnumerable(cancellation))
{
const int maxRetries = 3;
int retriesCounter = 0;
bool isSent = false;
// On this point a message already is removed from messagesQueue
while (!isSent && retriesCounter++ <= maxRetries)
{
try
{
// [1] Preprocess a message
// [2] Dispatch to an other service/layer
clientProxyCallback.SendMessage(cachedMessage);
isSent = true;
}
catch(Exception exception)
{
// [3]
// logging
if (!isSent && retriesCounter < maxRetries)
{
Thread.Sleep(NSeconds);
}
}
if (!isSent && retriesCounter == maxRetries)
{
// just log, message is lost on this stage!
}
}
}
编辑:忘了说这是 IIS 托管的 WCF 服务,它通过客户端回调合同将消息发送回 Silverlight 客户端 WCF 代理。
EDIT2:下面是我将如何使用Peek()
,我错过了什么吗?
bool successfullySent = true;
try
{
var item = queue.Peek();
PreProcessItem(item);
SendItem(item);
}
catch(Exception exception)
{
successfullySent = false;
}
finally
{
if (successfullySent)
{
// just remove already sent item from the queue
queue.Dequeue();
}
}
EDIT3:我当然可以使用 while 循环、布尔标志Queue
和的旧式方法AutoResetEvent
,但我只是想知道是否可以使用相同的方法BlockingCollection
,并且 GetConsumingEnumerable()
我认为Peek
在与消耗可枚举一起使用时,类似设施会非常有帮助,因为否则所有管道模式实现例如新的东西BlockingCollection
,GetConsumingEnumerable()
看起来不耐用,我必须回到旧的方法。