0

我有一个事件源系统,我现在正在实现一个端点,它将从事件存储事件重建读取端数据存储。但是,我现在在处理事件的方式上遇到了似乎是并发问题。

我决定在重建期间使用我的事件处理程序代码来处理事件。在系统的正常状态下(不是读取端数据库重建),我的事件处理程序侦听他们订阅的事件并相应地更新预测。但是,当通过它们的事件处理程序在线处理这些事件时,我看到读取端数据库最终状态的结果不一致(如果它甚至到达那里,它有时不会)。我想这意味着他们执行不正常。

我不应该以这种方式使用事件处理程序吗?我想既然我正在处理事件,那么以这种方式重用事件处理程序是非常合适的。


我正在使用MediatR进行服务消息传递。所有事件处理程序都实现INotificationHandler.

这是代码示例:

IEnumerable<IEvent> events = await _eventRepo.GetAllAggregateEvents(aggId);
int eventNumber = 0;
foreach (var e in events)
{
    if (e.Version != eventNumber + 1)
        throw new EventsOutOfOrderException("Events were out of order while rebuilding DB");

    var ev = e as Event;
    // publish different historic events to event handlers which work with read DBs

    switch (e.Type)
    {
        case EventType.WarehouseCreated:
            WarehouseCreated w = new WarehouseCreated(ev);
            await _mediator.Publish(w);
            break;
        case EventType.BoxCreated:
            BoxCreated b = new BoxCreated(ev);
            await _mediator.Publish(b);
            break;
        case EventType.BoxLocationChanged:
            BoxLocationChanged l = new BoxLocationChanged(ev);
            await _mediator.Publish(l);
            break;
    }
    eventNumber++;    
}

我已经尝试用await调用来替换关键字Wait()

类似的东西_mediator.Publish(bcc).Wait()。但这似乎不是一个好主意,因为这背后有异步代码。而且也没用。。

我还尝试对事件版本进行排队,并让事件类型案例等到它们的版本位于队列顶部,然后再发布事件。

就像是:

    case EventType.BoxContentsChanged:
        BoxContentsChanged bcc = new BoxContentsChanged(ev);
        while (eventQueue.Peek() != bcc.Version)
            continue;
        await _mediator.Publish(bcc);
        eventQueue.Dequeue();
        break;

这也没有奏效。


无论如何 - 如果有人对如何处理这个问题有任何想法,我将非常感激。我不希望以同步方式复制所有异步事件处理程序代码。

4

1 回答 1

0

我想最好的方法是同步,以确保一致性。这要求我在重播服务中复制一些事件处理程序逻辑并创建同步存储库方法。不过现在没有比赛条件,这很好。

于 2018-07-16T16:58:13.970 回答