5

我很好奇 MassTransit 消费者是否可以在实际检索 msg 之前 Peek() 一个 MSMQ 队列。

步骤/过程是什么:

1) 消息发送到队列

2) 消费者得到它并且必须进行数据库更新——大约需要 5 秒

3) 如果第一轮成功,消费者必须进行第二轮更新。

我的问题是,如果第一次数据库更新失败,消息留在队列中(即网络问题,无法到达数据库),我该如何处理。

目前,一旦它从队列中读取消息,它就会将其删除,然后如果数据库更新失败,它就会消失..

此外,我该如何处理电源故障——我的意思是,如果消费者的“工作”进行到一半,无论是什么(数据库更新或其他)并且电源消失等,我该如何重新运行该过程队列中的味精?可以说这项工作(无论如何在我目前的情况下)正在将新行推送到表中。我的意思是我可以编写代码来首先检查该行是否存在,如果它存在则删除消息,如果不存在则运行任务,但我怎样才能让它首先重新运行整个过程?

我已经读到我可以Peek()排队,然后运行任务,然后真正阅读队列味精并将其删除,但我一生无法弄清楚这是否适用于公共交通......有点迷失......

此外,我知道 Masstransit 有,.RetryLater但我是否在此过程中使用它?是 Initially--> When--> Then -->.RetryLater在传奇中吗?

任何指针都会被接受

最诚挚的问候罗宾

编辑

PS:我用的是saga....

Define(() =>
            {
                RemoveWhen(saga => saga.CurrentState == Completed);

                Initially(
                    When(NewAC)
                        .Then((saga, message) => saga.ProcessPSM(message),
                            InCaseOf<Exception>()
                               .TransitionTo(Problem)                                  
                                )
                        .Then((saga, message) => saga.PostProcessPSM())
                        .Complete()
                    );
                During(Problem,
                    When(Waiting)
                               // NOTE: THIS DOES NOT WORK!!!!
                        .RetryLater()
                    );
                });

RetryLater 抛出错误:“现有 saga 无法接受该消息”

我不确定我还能如何访问“RetryLater”。

4

1 回答 1

7

MassTransit 抽象了底层队列的概念。所以 Peek 不是解决方案,但它确实有其他重试消息的方法。如果您只对处理错误和故障情况感兴趣,那么以下机制就足够了。

默认情况下,如果消费者抛出异常,消息将被重试 N 次:

  • 其中 N 在总线上配置,默认为 5。它可以在总线初始化中使用 ServiceBusConfigurator 上的 SetDefaultRetryLimit 进行更改
  • 其中重试意味着消息将被添加到队列的末尾

如果您想要更细粒度的错误处理方法,您可以实现 Context Consumer,捕获可恢复或瞬态异常并手动调用 RetryLater。据我了解,这可以做多少次没有限制。

public class RetryConsumer : Consumes<AwesomeMessage>.Context
{

    public void Consume(IConsumeContext<AwesomeMessage> message)
    {
        try
        {
            Console.WriteLine("This is Attempt " + message.RetryCount);
            // Do Something
        }
        catch (SomeTransientException e)
        {
            message.RetryLater();
        }
    }
}
于 2012-12-21T12:21:52.380 回答