4

有没有办法指定针对特定异常重试消息的等待时间?

例如,如果对象处于 SomethingInProgress 状态,则抛出 SomethignInProgressException 并且我希望消息在 40m 后重试。还是提出SomethingInProgressEvent并使用bus.defer更合适?

4

2 回答 2

7

这就是 Rebus 没有二级重试概念的部分原因——我根本没有看到任何可以以通用且仍然足够灵活的方式创建此函数的方法。

简短地回答您的问题:不,没有(内置)方法可以改变特定异常重试之间的时间。事实上,根本没有办法在重试之间配置等待时间——失败的消息将尽可能快地重试,然后如果它们继续失败以避免“堵塞管道”,则将其移至错误队列。

在您的情况下,我建议您执行以下操作:

public void Handle(MyMessage message) {
    var headers = MessageContext.GetCurrent().Headers;
    var deliveryAttempt = headers.ContainsKey("attempt_no") 
        ? Convert.ToInt(headers["attempt_no"]) 
        : 0;

    try {
        DoWhateverWithThe(message);
    } catch(OneKindOfException e) {
        if (deliveryAttempt > 5) {
            bus.Advanced.Routing.ForwardCurrentMessage("error");
            return;
        }

        bus.AttachHeader(message, "attempt_no", deliveryAttempt + 1);
        bus.Defer(TimeSpan.FromSeconds(20), message);
    } catch(AnotherKindOfException e) {
        if (deliveryAttempt > 5) {
            bus.Advanced.Routing.ForwardCurrentMessage("error");
            return;
        }

        bus.AttachHeader(message, "attempt_no", deliveryAttempt + 1);
        bus.Defer(TimeSpan.FromMinutes(2), message);
    }
}

我只是在没有 100% 确定它实际编译的情况下写下了我的头顶......但它的要点是我们跟踪我们在消息的自定义标头中进行了多少次传递尝试,bus.Defer响起消息每个失败的传递尝试的适当时间跨度,当超过我们的最大传递尝试次数时,立即将消息转发到错误队列。

我希望这是有道理的 :)

于 2014-08-13T07:33:09.623 回答
2

如何执行此操作的最新示例是:

public async Task Handle(IFailed<MyMessage> message)
{
    var maxAttempts = 10;
    var optionalHeaders = new Dictionary<string, string>();
    if (message.Headers != null && message.Headers.ContainsKey("attemptNumber"))
    {
        // increment the attempt number
        var attemptNumber = int.Parse(message.Headers["attemptNumber"]);
        attemptNumber++;
        optionalHeaders.Add("attemptNumber", attemptNumber.ToString());
        if (attemptNumber > maxAttempts)
        {
            // log I give up message, message will move to dead queue
            return;
        }
    }
    else
        optionalHeaders.Add("attemptNumber", "1");

    // if message failed to process, defer processing for 5 minutes and try again
    await Bus.Defer(TimeSpan.FromMinutes(5), message.Message, optionalHeaders);
}
于 2018-01-30T01:37:35.030 回答