2

我们有一个与第三方网络服务对话的 Rebus 消息处理程序。由于我们无法直接控制的原因,此 WCF 服务经常抛出异常,因为它在自己的数据库中遇到了数据库死锁。然后 Rebus 将尝试处理此消息五次,这在大多数情况下意味着这五次中的一次将是幸运的并且不会陷入僵局。但是经常发生消息在死锁之后确实会死锁并最终进入我们的错误队列。

除了解决死锁的根源(这将是一个长期目标)之外,我还可以想到两个选择:

  1. 继续尝试仅使用此特定消息类型,直到成功。最好我可以设置一个超时,所以“如果有五个死锁,然后在 5 分钟内再试一次”,而不是通过连续尝试进一步阻塞进程。我已经做了一个 Thread.Sleep(random) 来传播消息,但它仍然会在五次尝试后放弃。

  2. 将此特定消息类型发送到一个不同的队列,该队列只有一个处理消息的工作人员,因此这是串行发生的,而不是并行发生的。我们当前的配置使用 8 个工作线程,但这只会使死锁情况变得更糟,因为 Web 服务现在被并发调用并且消息相互干扰。

选项#2 有我的偏好,但我不确定这是否可行。我们在接收端的配置目前如下所示:

var adapter = new Rebus.Ninject.NinjectContainerAdapter(this.Kernel);

var bus = Rebus.Configuration.Configure.With(adapter)
    .Logging(x => x.Log4Net())
   .Transport(t => t.UseMsmqAndGetInputQueueNameFromAppConfig())
   .MessageOwnership(d => d.FromRebusConfigurationSection())
   .CreateBus().Start();

接收方的 .config :

<rebus inputQueue="app.msg.input" errorQueue="app.msg.error" workers="8">
  <endpoints>
  </endpoints>
</rebus>

从配置中我可以看出,只能将一个输入队列设置为“收听”。我也无法通过流畅的映射 API 找到一种方法。这似乎也只需要一个输入和错误队列:

.Transport(t =>t.UseMsmq("input", "error"))

基本上,我正在寻找的是类似的东西:

<rebus workers="8">
  <input name="app.msg.input" error="app.msg.error" />
  <input name="another.input.queue" error="app.msg.error" />
</rebus>

关于如何处理我的要求的任何提示?

4

1 回答 1

2

我建议您使用 saga 和 Rebus 的超时服务来实现适合您需求的重试策略。这样,在启用 Rebus 的 Web 服务外观中,您可以执行以下操作:

public void Handle(TryMakeWebServiceCall message)
{
    try
    {
        var result = client.MakeWebServiceCall(whatever);

        bus.Reply(new ResponseWithTheResult{ ... });
    }
    catch(Exception e)
    {
        Data.FailedAttempts++;

        if (Data.FailedAttempts < 10)
        {
            bus.Defer(TimeSpan.FromSeconds(1), message);
            return;
        }

        // oh no! we failed 10 times... this is probably where we'd
        // go and do something like this:
        emailService.NotifyAdministrator("Something went wrong!");
    }
}

Data神奇地为您提供并在调用之间持久保存的 saga 数据在哪里。

有关如何创建 saga 的灵感,请查看有关协调随时间发生的事情的 wiki 页面,您可以在其中看到有关服务如何具有某些状态(即在您的情况下失败的尝试次数)存储在本地的示例在处理消息之间可用。

到了开始bus.Defer工作的时候,您有两个选择:1) 使用外部超时服务(我通常在每台服务器上安装一个),或者 2) 只使用“你自己”作为超时服务。

在配置时,你去

Configure.With(...)
    .(...)
    .Timeouts(t => // configure it here)

您可以选择StoreInMemory, StoreInSqlServer, StoreInMongoDb, StoreInRavenDb, 或UseExternalTimeoutManager.

如果您选择 (1),您需要检查 Rebus 代码并自己构建Rebus.Timeout - 它基本上只是一个可配置的、支持 Topshelf 的控制台应用程序,其中有一个 Rebus 端点。

如果您需要更多帮助来完成这项工作,请告诉我 - 这bus.Defer是您的系统变得很棒的地方,并且能够克服所有导致其他所有故障的小故障 :)

于 2013-05-30T19:03:44.070 回答