8

我将 DDD 与 CQRS 和事件溯源一起使用。我需要在我的自定义实现中使用事件存储(特别是这个事件存储IEventStore)来持久化和检索域事件,但是我在处理序列化/反序列化的方法上遇到了困难。

这是我正在实现的接口:

public interface IEventStore
{
    Task<IEnumerable<IDomainEvent>> GetEventsAsync(Identity aggregateIdentity, Type aggregateType);

    Task PersistAsync(IAggregateRoot aggregateRoot, IEnumerable<IDomainEvent> domainEvents);
}

在我的实现之外,IEventStore我可以将每个映射器映射IDomainEvent到一些可序列化/可反序列化的 EventDto 或 json 字符串。这不是问题。但这些是我的限制:

  • 我的域事件是实现的不可变对象IDomainEvent(即:没有设置器)

  • 我的域事件并不总是很容易以通用方式序列化/反序列化。它们通常具有抽象或接口属性,因此我的域事件和一些可序列化对象(如字符串 json 或事件 DTO)之间的具体映射器在我的IEventStore实现之外决定。

  • 我的IEventStore实现需要是通用的,如果我添加新的域事件类型,我不需要IEventStore实现中触及任何东西

  • 我的IEventStore实现可以接收注入的一些特定实现IMapper<TSource, TDestination>,以便我可以使用它们在特定类型(不是接口)之间进行序列化/反序列化。

    public interface IMapper<in TSource, out TDestination>
    {
        TDestination Map(TSource source); // I have implementations of this if needed
    }
    

下面是我的尝试:

public class MyEventStore
    : IEventStore
{
    private readonly IStreamNameFactory _streamNameFactory;
    private readonly IEventStoreConnection _eventStoreConnection; //this is the Greg Young's EventStore product that I want to use as database
    private readonly IDomainEventFactory _domainEventFactory;
    private readonly IEventDataFactory _eventDataFactory;

    public EventStore(
        IStreamNameFactory streamNameFactory, 
        IEventStoreConnection eventStoreConnection, 
        IDomainEventFactory domainEventFactory, 
        IEventDataFactory eventDataFactory)
    {
        _streamNameFactory = streamNameFactory;
        _eventStoreConnection = eventStoreConnection;
        _domainEventFactory = domainEventFactory;
        _eventDataFactory = eventDataFactory;
    }

    public async Task<IEnumerable<IDomainEvent>> GetEventsAsync(
        Identity aggregateIdentity, 
        Type aggregateType)
    {
        var aggregateIdentityValue = aggregateIdentity.Value;
        var streamName = _streamNameFactory.Create(aggregateIdentityValue, aggregateType);

        var streamEventSlice =
            await _eventStoreConnection.ReadStreamEventsForwardAsync(streamName, 0, Int32.MaxValue, false);

        var domainEvents = streamEventSlice
            .Events
            .Select(x => _domainEventFactory.Create(x));

        return domainEvents;
    }

    [SuppressMessage("ReSharper", "PossibleMultipleEnumeration")]
    public async Task PersistAsync(
        IAggregateRoot aggregateRoot, 
        IEnumerable<IDomainEvent> domainEvents)
    {
        var numberOfEvents = domainEvents.Count();
        var aggregateRootVersion = aggregateRoot.Version;
        var originalVersion = aggregateRootVersion - numberOfEvents;
        var expectedVersion = originalVersion - 1;

        var aggregateIdentityValue = aggregateRoot.AggregateIdentity.Value;
        var aggregateRootType = aggregateRoot.GetType();
        var streamName = _streamNameFactory.Create(aggregateIdentityValue, aggregateRootType);
        var assemblyQualifiedName = aggregateRootType.AssemblyQualifiedName;

        var eventsToStore = domainEvents.Select(x => _eventDataFactory.Create(x, assemblyQualifiedName));

        await _eventStoreConnection.AppendToStreamAsync(streamName, expectedVersion, eventsToStore);
    }
}

正如您可以想象的那样,问题主要在于IDomainEventFactory实施。我需要一个实现以下接口的类:

public interface IDomainEventFactory
{
    IDomainEvent Create(ResolvedEvent resolvedEvent);
}

这个类需要知道IDomainEvent它需要在运行时将resolvedEvent反序列化到哪个具体。换句话说,如果要检索的事件是 json 表示,MyThingCreatedEvent也许我可以使用诸如IMapper<ResolvedEvent, MyThingCreatedEvent>. 但是,如果要检索的事件是 json 表示,MyThingUpdatedEvent那么我将需要一个服务,例如IMapper<ResolvedEvent, MyThingUpdatedEvent>.

我想到了一些方法。

选项 1: 我认为我可以让IDomainEventFactory实现使用 autofac IComponentContext,以便在运行时我可以设法做一些_componentContext.Resolve(theNeededType). 但我不知道如何检索我需要的 IMapper。也许这是可能的,但我对此表示怀疑。

选项 2: 也许我可以拥有一些地图服务,例如 IBetterMapper,例如

public interface IBetterMapping
{
    TDestination Map<TDestination>(object source) where TDestination : class;
}

这样我的工厂就可以将知道如何将任何东西反序列化为TDestination. 但是我会遇到同样的问题:我不知道如何在运行时从字符串创建一个类型,例如,做类似的事情_myBetterMapper.Map<WhichTypeHere>,并且实现该 Map 方法还有一个额外的问题,我想这需要一些注册表并根据类型选择一个或另一个特定的映射器。

我真的坚持这一点。希望我能得到你们的帮助!:)

更新:我已经实现了自己的解决方案并将项目上传到我的个人仓库中:https ://gitlab.com/iberodev/DiDrDe.EventStore.Infra.EventStore 我采用的解决方案是保持事件存储包装器不可知,但在 DI 注册时为那些有点“特殊”的事件提供自定义序列化器/反序列化器。EventStore 允许添加自定义元数据标头,因此我使用一些自定义标头来指定每个数据流上的具体实现类型,以便在检索持久事件时知道在哪里反序列化。

4

1 回答 1

2

更新答案:

随着时间的推移,我开始意识到整个方法是一种不好的做法。我认为域事件永远不应该具有抽象(多态)属性,这些属性可能会呈现不同的形状,因此在反序列化时会出现问题,以便确切地知道事件被序列化为什么形状。

问题不是技术问题(尽管为此,我在下面的回答仍然有效),而是哲学问题。

我坚信领域事件应该只使用基本类型。不会改变的东西(字符串、整数,也许是一些“安全”的自定义类型,例如金钱等)。拥有多态域事件没有多大意义。如果一个事件可以采取不同的形式,那么我们可能正在谈论不同的事件

重要的是要考虑到一个非常古老的事件(例如:一年前提出的事件)在创建投影时也必须反序列化(例如:在回放期间,或者只是在使用事件源的聚合实例化期间),因此应该正确反序列化此事件而不会失败。想象一下,如果出于某种原因有人修改了该事件正在使用的类之一,而现在旧信息不能反序列化到新类中,那会是怎样的一团糟。我们将违反事件溯源中最基本的东西。

这就是为什么我认为我们不应该对复杂对象使用域事件,除非我们 100% 确定这些类不会改变,并且我们根本不应该使用多态域事件。


我已经在 EventStore .NET Client 上实现了一个包装器,它实现了我的IEventStore接口并将我的客户端应用程序从幕后的任何东西中抽象出来。

public interface IEventStore
{
    Task<IEnumerable<IDomainEvent>> GetEventsAsync(Guid aggregateId, Type aggregateType);
    Task PersistAsync(IAggregateRoot aggregateRoot, IEnumerable<IDomainEvent> domainEvents);
}

我解决序列化/反序列化主要问题的方法是为“特殊”的域事件提供自定义序列化器/反序列化器(因为它们具有抽象或接口属性,除非知道其特定的具体类型,否则无法反序列化)。此外,对于每个持续存在的域事件,我都会保存元数据标头,说明它是哪个特定的域事件类型以及它是哪个特定的可序列化事件类型。

换句话说,持久化时的流程是这样的: IDomainEvent -> convert to a serializable type (if needed) -> transform in bytes -> save stream data

并且在检索时 Stream Data -> transform to serializable type -> transform to IDomainEvent

我已将整个项目上传到我在 GitLab 的个人存储库中https ://gitlab.com/iberodev/DiDrDe.EventStore.Infra.EventStore ,请随意查看并使用 xUnit 运行所有集成和单元测试以了解它。当然,请随时提供任何反馈!

我的解决方案的繁重工作在于需要使用事件存储的客户端部分。使用 Autofac 扩展注册 EventStore 并在需要时提供所需的自定义序列化器/反序列化器是其基础设施层(其主机应用程序中的 Autofac 注册)的责任。

这样,我可以使 EventStore 包装器的实现完全不受特定设置和特定域事件的影响。这是一个通用的解决方案。

该项目的自述文件澄清了这一点,但如果域事件是可序列化的(没有抽象属性),基本上可以像这样注册事件存储:

var builder = new ContainerBuilder(); // Autofac container
builder
    .RegisterEventStore(
        ctx =>
        {
            var eventStoreOptions =
                new EventStoreOptions
                {
                    ConnectionString = "ConnectTo=tcp://admin:changeit@127.0.0.1:1113; HeartBeatTimeout=500";
                };
            return eventStoreOptions;
        });
var container = builder.Build();

如果存在特殊的领域事件,因为它们具有抽象属性,则像这样:

var builder = new ContainerBuilder();
builder
    .RegisterEventStore(
        ctx =>
        {
            var eventStoreOptions =
                new EventStoreOptions
                {
                    ConnectionString = "ConnectTo=tcp://admin:changeit@127.0.0.1:1113; HeartBeatTimeout=500";
                };
            return eventStoreOptions;
        },
        ctx =>
        {
            var customDomainEventMappersOptions =
                new CustomDomainEventMappersOptions()
                    .UsesCustomMappers<FakeDomainEventNotSerializable, FakeSerializableEvent>(
                        domainEvent =>
                        {
                            var mapper =
                                new FakeDomainEventNotSerializableToFakeSerializableEventMapper();
                            var result = mapper.Map(domainEvent);
                            return result;
                        },
                        serializableEvent =>
                        {
                            var mapper =
                                new FakeSerializableEventToFakeDomainEventNotSerializableMapper();
                            var result = mapper.Map(serializableEvent);
                            return result;
                        });
            return customDomainEventMappersOptions;
        });

var container = builder.Build();
于 2018-12-31T13:06:39.263 回答