1

我们有一个处理多种消息类型的 NServiceBus 实现:

public class StateCoordinator : Saga<MessageData>, 
                                IAmStartedByMessages<CreateMessage>, 
                                IAmStartedByMessages<ConfirmMessage>

MessageData 是这样的:

public class FlowData : IContainSagaData
{
    [Unique]
    public Guid MappingId { get; set; }

    public Guid Id { get; set; }

    public string OriginalMessageId { get; set; }

    public string Originator { get; set; }

    public List<MessagePart> MessageParts { get; set; }
}

public MessagePart
{
    public int Id { get; set; }

    public string Status { get; set; }
}

CreateMessage 有一个 MessagePart,它在其处理程序中添加到 MessageParts。ConfirmMessage 更新特定 MessagePart 的状态。

这是场景:

1) 接收到第一个 CreateMessage。这将在 Raven 中创建 Saga,并将 MessagePart(状态为“1”)添加到 MessageParts。

2) 接收到第一个 ConfirmMessage。这会将 Saga 中第一个添加的 MessagePart 的状态更新为“1 2”。转到 RavenDB 中的文档时,可以在浏览器中看到这一点。

3) 接收到第二个 CreateMessage。这会向 MessageParts 添加第二个 MessagePart。查看数据时,第一个 MessagePart 的状态仍然是“1”而不是“1 2”,这会引发并发异常(ActualETag 不等于 ExpectedETag):

A first chance exception of type 'Raven.Abstractions.Exceptions.ConcurrencyException' occurred in Raven.Client.Lightweight

Additional information: PUT attempted on document 'flow/79a7ee20-f090-4648-9b62-a3da00d87c93' using a non current etag

看起来 Saga 数据是按消息类型缓存的。是这样吗?有解决办法吗?

笔记:

我们正在使用多个 IAmStartedByMessages,但是当 ConfirmMessage 在 CreateMessage 之前时,此消息将被添加到队列中,直到 CreateMessage 被处理。

4

1 回答 1

2

经过几次测试,我可以说没有真正的问题。我认为您无法解决并发异常(已记录此异常),但是当发生此异常时,消息只是发送回队列进行重试。

我担心来自 Handle 的逻辑会被执行两次,但我猜是因为我启用了事务,情况并非如此。例如:其中一个动作是将消息发送到另一个总线以保持事件(perister-queue)。这个动作在异常发生之前被调用(在 Handle 之后,Data 被保存在 Saga 中并且抛出异常)但是没有消息添加到持久化队列中。重试成功的消息后(这会更新数据,因此不再有并发异常),持久化消息被添加到持久化队列中。

结论:不是一个真正的问题,只是日志文件中的一个额外行。

于 2014-11-05T15:33:27.137 回答