17

我创建了一个 WCF 服务并正在使用 netMsmqBinding 绑定。

这是一个简单的服务,它将 Dto 传递给我的服务方法并且不期望响应。消息被放置在一个 MSMQ 中,并且一旦被拾取就插入到一个数据库中。

确保没有数据丢失的最佳方法是什么。

我尝试了以下两种方法:

  1. 抛出异常

    这会将消息置于死信队列中以供手动阅读。当我的服务开始时,我可以处理这个

  2. 在绑定上设置 receiveRetryCount="3"

    经过 3 次尝试后 - 瞬间发生,这似乎将消息留在队列中,但我的服务出错了。重新启动我的服务会重复此过程。

理想情况下,我想做以下事情:

尝试处理消息

  • 如果失败,请等待该消息 5 分钟,然后重试。
  • 如果该过程失败 3 次,则将消息移动到死信队列。
  • 重新启动服务会将死信队列中的所有消息推回队列中,以便对其进行处理。

我能做到这一点吗?如果有怎么办?你能给我指出任何关于如何最好地利用 WCF 和 MSMQ 来处理我给定场景的好文章吗?

任何帮助将非常感激。谢谢!

一些附加信息

我在 Windows XP 和 Windows Server 2003 上使用 MSMQ 3.0。不幸的是,我无法使用针对 MSMQ 4.0 和 Vista/2008 的内置有害消息支持。

4

4 回答 4

14

我认为使用 MSMQ(仅在 Vista 上可用)您可以这样做:

<bindings>
    <netMsmqBinding>
        <binding name="PosionMessageHandling"
             receiveRetryCount="3"
             retryCycleDelay="00:05:00"
             maxRetryCycles="3"
             receiveErrorHandling="Move" />
    </netMsmqBinding>
</bindings>

WCF 将在第一次调用失败后立即重试 ReceiveRetryCount 次。批处理失败后,消息将移至重试队列。延迟 RetryCycleDelay 分钟后,消息从重试队列移动到端点队列,并重试批处理。这将重复 MaxRetryCycle 时间。如果所有失败,则根据receiveErrorHandling 处理消息,该消息可以移动(到毒队列)、拒绝、丢弃或故障

顺便说一句,关于 WCF 和 MSMQ 的好文章是来自 Juval Lowy 的 Progammig WCF 书的第 9 章

于 2008-09-17T12:29:41.833 回答
9

SDK 中有一个示例可能对您的情况有用。基本上,它所做的是将 IErrorHandler 实现附加到您的服务,当 WCF 将消息声明为“毒药”(即,当所有配置的重试都已用尽时)时,它将捕获错误。该示例所做的是将消息移动到另一个队列,然后重新启动与该消息关联的 ServiceHost(因为在发现有害消息时它会出现故障)。

这不是一个非常漂亮的示例,但它可能很有用。但是有几个限制:

1-如果您有多个与您的服务关联的端点(即通过多个队列公开),则无法知道有毒消息到达哪个队列。如果您只有一个队列,这不会是问题。我还没有看到任何官方的解决方法,但我已经尝试了一种可能的替代方案,我在这里记录了:http ://winterdom.com/weblog/2008/05/27/NetMSMQAndPoisonMessages.aspx

2-一旦问题消息被移动到另一个队列,它就成为您的责任,因此一旦超时完成(或将新服务附加到该队列以处理它),您就可以将其移回处理队列。

老实说,在任何一种情况下,您都会在这里看到一些 WCF 本身并没有涵盖的“手动”工作。

我最近一直在从事一个不同的项目,我需要明确控制重试发生的频率,我当前的解决方案是创建一组重试队列并在重试队列和主处理队列之间手动移动消息基于一组计时器和一些启发式方法,仅使用原始 System.Messaging 东西来处理 MSMQ 队列。它似乎工作得很好,但如果你这样做会有几个问题。

于 2008-09-17T17:36:23.077 回答
4

如果您使用的是 SQL-Server,那么您应该使用分布式事务,因为 MSMQ 和 SQL-Server 都支持它。发生的情况是您将数据库写入包装在 TransactionScope 块中并仅在成功时调用 scope.Complete() 。如果失败,那么当您的 WCF 方法返回时,消息将被放回队列中以再次尝试。这是我使用的精简版代码:

    [OperationBehavior(TransactionScopeRequired=true, TransactionAutoComplete=true)]
    public void InsertRecord(RecordType record)
    {
        try
        {
            using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
            {
                SqlConnection InsertConnection = new SqlConnection(ConnectionString);
                InsertConnection.Open();

                // Insert statements go here

                InsertConnection.Close();

                // Vote to commit the transaction if there were no failures
                scope.Complete();
            }
        }
        catch (Exception ex)
        {
            logger.WarnException(string.Format("Distributed transaction failure for {0}", 
                Transaction.Current.TransactionInformation.DistributedIdentifier.ToString()),
                ex);
        }
     }

我通过排队大量但已知数量的记录来测试这一点,让 WCF 启动大量线程来同时处理其中的许多(达到 16 个线程 - 一次队列中有 16 条消息),然后在操作中间终止进程. 当程序重新启动时,从队列中读回消息并再次处理,就好像什么都没发生一样,并且在测试结束时数据库是一致的并且没有丢失的记录。

分布式事务管理器具有环境存在,当您创建 TransactionScope 的新实例时,它会自动在方法调用范围内搜索当前事务——当 WCF 将消息从队列中弹出时,它应该已经创建并调用了你的方法。

于 2008-09-17T12:54:14.520 回答
1

不幸的是,我被困在 Windows XP 和 Windows Server 2003 上,所以这对我来说不是一个选择。- (我将在我的问题中重新澄清这一点,因为我在发布后找到了这个解决方案并意识到我无法使用它)

我发现一种解决方案是设置一个自定义处理程序,它将我的消息移动到另一个队列或毒队列并重新启动我的服务。这对我来说似乎很疯狂。想象一下,我的 Sql Server 关闭了服务重启的频率。

所以我最终做的是让线路出现故障并在队列中留下消息。我还向我的系统日志记录服务记录了一条致命消息,表明这已经发生。一旦我们的问题得到解决,我会重新启动服务,所有消息都会再次开始处理。

我意识到重新处理此消息或任何其他消息都会失败,那么为什么需要将此消息和其他消息移动到另一个队列。我不妨停止我的服务,并在一切按预期运行时重新启动它。

aogan,您对 MSMQ 4.0 有完美的答案,但不幸的是不适合我

于 2008-09-17T12:43:24.017 回答