1

我想知道是否有人以前遇到过这种情况:

我处理一个命令,在处理程序中,我将一个事件保存到事件存储区(joliver)。
在调度之后,再次处理相同命令的处理程序。
我知道它是相同的命令,因为命令上的 guid 是相同的。

五次尝试后,nservicebus 说由于最大重试次数,命令失败。
所以显然命令失败了,但我没有得到任何失败的迹象。我已将调度程序的内容放在 try catch 中,但没有捕获到错误。代码退出调度程序后,事件处理程序将始终触发,就好像发生了错误一样。

跟踪代码,将事件保存到数据库(我看到行),调度程序运行,并将 Dispatched 列设置为 true,然后处理程序再次处理命令,过程重复,并插入另一行进入提交表。

什么可能会失败?我没有在事件商店的某处设置成功标志吗?如果我将事件存储与 nServicebus 分离,两者都将按预期运行,不会重试和失败。

调度员:

    public void Dispatch(Commit commit)
    {
        for (var i = 0; i < commit.Events.Count; i++)
        {
            try
            {
                var eventMessage = commit.Events[i];
                var busMessage = (T)eventMessage.Body;
                //bus.Publish(busMessage);

            }
            catch (Exception ex)
            {
                throw ex;
            }
        }
    }

Wireup.Init()

    private static IStoreEvents WireupEventStore()
    {
        return Wireup.Init()
            .LogToOutputWindow()
            .UsingSqlPersistence("EventStore")
            .InitializeStorageEngine()
            .UsingBinarySerialization()
            //.UsingJsonSerialization()
            //    .Compress()
            //.UsingAsynchronousDispatchScheduler()
            //    .DispatchTo(new NServiceBusCommitDispatcher<T>())

           .UsingSynchronousDispatchScheduler()
               .DispatchTo(new DelegateMessageDispatcher(DispatchCommit))

            .Build();
    }
4

1 回答 1

1

我在保存时打开了一个我从未关闭过的事务范围。

    public static void Save(AggregateRoot root)
    {
        // we can call CreateStream(StreamId) if we know there isn't going to be any data.
        // or we can call OpenStream(StreamId, 0, int.MaxValue) to read all commits,
        // if no commits exist then it creates a new stream for us.
        using (var scope = new TransactionScope())
        using (var eventStore = WireupEventStore())
        using (var stream = eventStore.OpenStream(root.Id, 0, int.MaxValue))
        {
            var events = root.GetUncommittedChanges();
            foreach (var e in events)
            {
                stream.Add(new EventMessage { Body = e });
            }

            var guid = Guid.NewGuid();
            stream.CommitChanges(guid);
            root.MarkChangesAsCommitted();

            scope.Complete(); // <-- missing this
        }
    }
于 2013-03-25T20:18:20.343 回答