我将 DDD 与 CQRS 和事件溯源一起使用。我需要在我的自定义实现中使用事件存储(特别是这个事件存储IEventStore
)来持久化和检索域事件,但是我在处理序列化/反序列化的方法上遇到了困难。
这是我正在实现的接口:
public interface IEventStore
{
Task<IEnumerable<IDomainEvent>> GetEventsAsync(Identity aggregateIdentity, Type aggregateType);
Task PersistAsync(IAggregateRoot aggregateRoot, IEnumerable<IDomainEvent> domainEvents);
}
在我的实现之外,IEventStore
我可以将每个映射器映射IDomainEvent
到一些可序列化/可反序列化的 EventDto 或 json 字符串。这不是问题。但这些是我的限制:
我的域事件是实现的不可变对象
IDomainEvent
(即:没有设置器)我的域事件并不总是很容易以通用方式序列化/反序列化。它们通常具有抽象或接口属性,因此我的域事件和一些可序列化对象(如字符串 json 或事件 DTO)之间的具体映射器在我的
IEventStore
实现之外决定。我的
IEventStore
实现需要是通用的,如果我添加新的域事件类型,我不需要在IEventStore
实现中触及任何东西我的
IEventStore
实现可以接收注入的一些特定实现IMapper<TSource, TDestination>
,以便我可以使用它们在特定类型(不是接口)之间进行序列化/反序列化。public interface IMapper<in TSource, out TDestination> { TDestination Map(TSource source); // I have implementations of this if needed }
下面是我的尝试:
public class MyEventStore
: IEventStore
{
private readonly IStreamNameFactory _streamNameFactory;
private readonly IEventStoreConnection _eventStoreConnection; //this is the Greg Young's EventStore product that I want to use as database
private readonly IDomainEventFactory _domainEventFactory;
private readonly IEventDataFactory _eventDataFactory;
public EventStore(
IStreamNameFactory streamNameFactory,
IEventStoreConnection eventStoreConnection,
IDomainEventFactory domainEventFactory,
IEventDataFactory eventDataFactory)
{
_streamNameFactory = streamNameFactory;
_eventStoreConnection = eventStoreConnection;
_domainEventFactory = domainEventFactory;
_eventDataFactory = eventDataFactory;
}
public async Task<IEnumerable<IDomainEvent>> GetEventsAsync(
Identity aggregateIdentity,
Type aggregateType)
{
var aggregateIdentityValue = aggregateIdentity.Value;
var streamName = _streamNameFactory.Create(aggregateIdentityValue, aggregateType);
var streamEventSlice =
await _eventStoreConnection.ReadStreamEventsForwardAsync(streamName, 0, Int32.MaxValue, false);
var domainEvents = streamEventSlice
.Events
.Select(x => _domainEventFactory.Create(x));
return domainEvents;
}
[SuppressMessage("ReSharper", "PossibleMultipleEnumeration")]
public async Task PersistAsync(
IAggregateRoot aggregateRoot,
IEnumerable<IDomainEvent> domainEvents)
{
var numberOfEvents = domainEvents.Count();
var aggregateRootVersion = aggregateRoot.Version;
var originalVersion = aggregateRootVersion - numberOfEvents;
var expectedVersion = originalVersion - 1;
var aggregateIdentityValue = aggregateRoot.AggregateIdentity.Value;
var aggregateRootType = aggregateRoot.GetType();
var streamName = _streamNameFactory.Create(aggregateIdentityValue, aggregateRootType);
var assemblyQualifiedName = aggregateRootType.AssemblyQualifiedName;
var eventsToStore = domainEvents.Select(x => _eventDataFactory.Create(x, assemblyQualifiedName));
await _eventStoreConnection.AppendToStreamAsync(streamName, expectedVersion, eventsToStore);
}
}
正如您可以想象的那样,问题主要在于IDomainEventFactory
实施。我需要一个实现以下接口的类:
public interface IDomainEventFactory
{
IDomainEvent Create(ResolvedEvent resolvedEvent);
}
这个类需要知道IDomainEvent
它需要在运行时将resolvedEvent反序列化到哪个具体。换句话说,如果要检索的事件是 json 表示,MyThingCreatedEvent
也许我可以使用诸如IMapper<ResolvedEvent, MyThingCreatedEvent>
. 但是,如果要检索的事件是 json 表示,MyThingUpdatedEvent
那么我将需要一个服务,例如IMapper<ResolvedEvent, MyThingUpdatedEvent>
.
我想到了一些方法。
选项 1:
我认为我可以让IDomainEventFactory
实现使用 autofac IComponentContext
,以便在运行时我可以设法做一些_componentContext.Resolve(theNeededType)
. 但我不知道如何检索我需要的 IMapper。也许这是可能的,但我对此表示怀疑。
选项 2: 也许我可以拥有一些地图服务,例如 IBetterMapper,例如
public interface IBetterMapping
{
TDestination Map<TDestination>(object source) where TDestination : class;
}
这样我的工厂就可以将知道如何将任何东西反序列化为TDestination
. 但是我会遇到同样的问题:我不知道如何在运行时从字符串创建一个类型,例如,做类似的事情_myBetterMapper.Map<WhichTypeHere>
,并且实现该 Map 方法还有一个额外的问题,我想这需要一些注册表并根据类型选择一个或另一个特定的映射器。
我真的坚持这一点。希望我能得到你们的帮助!:)
更新:我已经实现了自己的解决方案并将项目上传到我的个人仓库中:https ://gitlab.com/iberodev/DiDrDe.EventStore.Infra.EventStore 我采用的解决方案是保持事件存储包装器不可知,但在 DI 注册时为那些有点“特殊”的事件提供自定义序列化器/反序列化器。EventStore 允许添加自定义元数据标头,因此我使用一些自定义标头来指定每个数据流上的具体实现类型,以便在检索持久事件时知道在哪里反序列化。