1

处理程序接收到的每条消息都包含聚合根的整个状态。然后系统能够根据这些数据执行所需的操作。在我的场景中,根据消息中的数据授予访问权限,例如对房间 A 和 B 的访问权限。一条消息包含整组已授予的访问权限。这些消息可能会乱序到达,因为像 MSMQ 这样的消息传递系统不保证有序传递。

消息 #1 授予对房间 A 和 B 的访问权限,但消息 #2 仅授予对房间 A 的访问权限的场景。如果它们无序到达,则授予房间 A 的访问权限,然后再授予房间 A 的访问权限& B. 这不是预期的结果。只应允许进入房间 A。每条消息都包含一个时间戳,该时间戳在发布时设置。我想使用此时间戳来删除乱序到达的消息,例如,如果消息 #2 在消息 #1 之前到达,则应该丢弃消息 1#。

我可以在每个处理程序方法中实现这个过滤器,但这会很乏味,所以我希望 Rebus 有一些类似EAI 消息过滤器的东西?

我愿意接受其他选择/实施吗?

4

1 回答 1

1

我不确定我是否理解您为什么要在消息中传递整个聚合根,但除此之外,还有一种简单的方法可以让 Rebus 使您能够对消息进行排序。

我建议你让你的所有消息实现一个通用接口来捕获这个特定方面,例如

public interface IHaveSequenceNumber
{
    int SequenceNumber { get; }
}

然后,您创建一个简单的消息处理程序,IHaveSequenceNumber在遇到旧消息时处理和中止处理程序管道,例如

public class WillDiscardOldMessages : IHandleMessages<IHaveSequenceNumber>
{
    public void Handle(IHaveSequenceNumber messageWithSequenceNumber)
    {
        if (IsTooOld(messageWithSequenceNumber))
        {
            MessageContext.GetCurrent().Abort(); //< make this the last handler
        }
    }
}

然后 - 非常重要:) - 你确保你的消息过滤器总是在调度开始时首先在处理程序的管道中:

Configure.With(...)
    .Transport(...)
    .SpecifyOrderOfHandlers(s => s.First<WillDiscardOldMessages>())
    .(...)

我将把上面的方法的实现IsTooOld()留给你——如果你的端点中有一个工作线程,这可能会很简单,而并发处理这些事情并不是微不足道的。

那有意义吗?

于 2013-04-16T18:48:33.540 回答