我有一个事件源系统,我现在正在实现一个端点,它将从事件存储事件重建读取端数据存储。但是,我现在在处理事件的方式上遇到了似乎是并发问题。
我决定在重建期间使用我的事件处理程序代码来处理事件。在系统的正常状态下(不是读取端数据库重建),我的事件处理程序侦听他们订阅的事件并相应地更新预测。但是,当通过它们的事件处理程序在线处理这些事件时,我看到读取端数据库最终状态的结果不一致(如果它甚至到达那里,它有时不会)。我想这意味着他们执行不正常。
我不应该以这种方式使用事件处理程序吗?我想既然我正在处理事件,那么以这种方式重用事件处理程序是非常合适的。
我正在使用MediatR进行服务消息传递。所有事件处理程序都实现INotificationHandler
.
这是代码示例:
IEnumerable<IEvent> events = await _eventRepo.GetAllAggregateEvents(aggId);
int eventNumber = 0;
foreach (var e in events)
{
if (e.Version != eventNumber + 1)
throw new EventsOutOfOrderException("Events were out of order while rebuilding DB");
var ev = e as Event;
// publish different historic events to event handlers which work with read DBs
switch (e.Type)
{
case EventType.WarehouseCreated:
WarehouseCreated w = new WarehouseCreated(ev);
await _mediator.Publish(w);
break;
case EventType.BoxCreated:
BoxCreated b = new BoxCreated(ev);
await _mediator.Publish(b);
break;
case EventType.BoxLocationChanged:
BoxLocationChanged l = new BoxLocationChanged(ev);
await _mediator.Publish(l);
break;
}
eventNumber++;
}
我已经尝试用await
调用来替换关键字Wait()
。
类似的东西_mediator.Publish(bcc).Wait()
。但这似乎不是一个好主意,因为这背后有异步代码。而且也没用。。
我还尝试对事件版本进行排队,并让事件类型案例等到它们的版本位于队列顶部,然后再发布事件。
就像是:
case EventType.BoxContentsChanged:
BoxContentsChanged bcc = new BoxContentsChanged(ev);
while (eventQueue.Peek() != bcc.Version)
continue;
await _mediator.Publish(bcc);
eventQueue.Dequeue();
break;
这也没有奏效。
无论如何 - 如果有人对如何处理这个问题有任何想法,我将非常感激。我不希望以同步方式复制所有异步事件处理程序代码。