3

我是 NEventStore 和一般事件采购的新手。在一个项目中,我想使用 NEventStore 来持久化我们的聚合生成的事件,但是我在正确处理并发方面遇到了一些问题。

如何使用乐观锁写入同一个流?

假设我有 2 个相同聚合的实例,它们在版本 1 中从 2 个不同的线程加载。然后是第一个线程调用命令 A 和第二个线程调用命令 B 。使用聚合的乐观锁之一应该会失败并出现并发异常。

我想使用 maxRevision 从加载聚合的点打开流,但似乎 CommitChanges 永远不会失败,如果我通过旧修订版也是如此。

我错过了什么?使用 NEventStore/Event Sourcing 时乐观锁可能/正确吗?

这是我用来重现问题的代码:

namespace NEventStore.Example
{
    using System;
    using System.Transactions;
    using NEventStore;
    using NEventStore.Dispatcher;
    using NEventStore.Persistence.SqlPersistence.SqlDialects;

    internal static class MainProgram
    {
        private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
        private static IStoreEvents store;

        private static void Main()
        {
            using (var scope = new TransactionScope())
            using (store = WireupEventStore())
            {
                Client1(revision: 0);

                Client2(revision: 0);

                scope.Complete();
            }

            Console.WriteLine(Resources.PressAnyKey);
            Console.ReadKey();
        }

        private static IStoreEvents WireupEventStore()
        {
             return Wireup.Init()
                .UsingInMemoryPersistence()
                .Build();
        }

        private static void Client1(int revision)
        {
            using (var stream = store.OpenStream(StreamId, 0, revision))
            {
                var @event = new SomeDomainEvent { Value = "Client 1 - event 1." };

                stream.Add(new EventMessage { Body = @event });


                stream.CommitChanges(Guid.NewGuid());
            }
        }

        private static void Client2(int revision)
        {
            using (var stream = store.OpenStream(StreamId, 0, revision))
            {
                var @event = new SomeDomainEvent { Value = "Client 2 - event 1." };

                stream.Add(new EventMessage { Body = @event });


                stream.CommitChanges(Guid.NewGuid());
            }
        }
    }
}

我预计客户端 2 会失败,因为我使用旧版本打开流。

更新 26/08/2013:我已经使用 Sql server 测试了相同的代码,并且似乎按预期工作。

namespace NEventStore.Example
{
    using System;
    using System.Transactions;
    using NEventStore;
    using NEventStore.Dispatcher;
    using NEventStore.Persistence.SqlPersistence.SqlDialects;

    internal static class MainProgram
    {
        private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
        private static IStoreEvents store;

        private static void Main()
        {
            using (store = WireupEventStore())
            {
                OpenOrCreateStream();

                AppendToStream_Client1(revision: 1);

                AppendToStream_Client2(revision: 1); // throws an error
                // AppendToStream_Client2(revision: 2); // works
            }

            Console.WriteLine(Resources.PressAnyKey);
            Console.ReadKey();
        }

        private static IStoreEvents WireupEventStore()
        {
             return Wireup.Init()
                .LogToOutputWindow()
                .UsingInMemoryPersistence()
                .UsingSqlPersistence("EventStore") // Connection string is in app.config
                    .WithDialect(new MsSqlDialect())
                    .InitializeStorageEngine()
                    .UsingJsonSerialization()
                .Build();
        }

        private static void OpenOrCreateStream()
        {
            using (var stream = store.OpenStream(StreamId, 0, int.MaxValue))
            {
                var @event = new SomeDomainEvent { Value = "Initial event." };

                stream.Add(new EventMessage { Body = @event });
                stream.CommitChanges(Guid.NewGuid());
            }
        }

        private static void AppendToStream_Client1(int revision)
        {
            using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
            {
                var @event = new SomeDomainEvent { Value = "Second event 1." };

                stream.Add(new EventMessage { Body = @event });
                stream.CommitChanges(Guid.NewGuid());
            }
        }

        private static void AppendToStream_Client2(int revision)
        {
            using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
            {
                var @event = new SomeDomainEvent { Value = "Second event 2." };

                stream.Add(new EventMessage { Body = @event });
                stream.CommitChanges(Guid.NewGuid());
            }
        }
    }
}

回到我的问题:要启用乐观锁,我应该在打开流时使用修订版吗?还有其他可能的实现或指导方针吗?

谢谢

4

1 回答 1

6

首先,主要目的是测试的内存中持久性实现不是事务感知的。在您的原始示例中,客户端 2 将简单地将其事件附加到流中。尝试使用支持事务的持久性存储(SQL 和 Raven,但不包括 Mongo)运行上述内容。

其次,在打开流时指定最小/最大修订用于不同目的:

  1. 当重新对聚合进行水合并且没有可用的快照时,您可以指定 (min:0, max:int.MaxValue),因为您有兴趣检索所有事件。
  2. 当重新水合聚合并且快照可用时,您将指定 (min:snapshot.Version, max:int.MaxValue) 以获取自快照以来发生的所有事件。
  3. 保存聚合时,您将指定 (min:0, max:Aggregate.Version)。Aggregate.Version 是在再水合期间派生的。如果相同的骨料同时在其他地方重新水化并保存,您将有一个竞争条件并且ConcurrencyException会发生。

对其中大部分的支持将封装在域框架中。请参阅CommonDomain中的AggregateBaseEventStoreRepository

第三,也是最重要的,在单个事务中更新 >1 个流是一种代码异味。如果您正在执行 DDD/ES,则流表示单个聚合根,根据定义,它是一致性边界。在事务中创建/更新多个 AR 会打破这一点。NEventStore 的事务支持(不情愿地)被添加,因此它可以与其他工具一起使用,即从 MSMQ/NServiceBus/whatever 以事务方式读取命令并处理它,或者以事务方式将提交消息发送到队列并将其标记为这样。就个人而言,我建议您尽量避免使用 2PC。

于 2013-08-25T16:29:22.263 回答