5

我有一个 MassTransitStateMachine 来编排一个涉及创建多个事件的过程。

完成所有事件后,我希望状态转换到“清理”阶段。

下面是相关的状态声明和过滤函数:

        During(ImportingData,
            When(DataImported)
                // When we get a data imported event, mark this source as done. 
                .Then(MarkImportCompletedForLocation),

            When(DataImported, IsAllDataImported)
                // Once all are done, we can transition to cleaning up...
                .Then(CleanUpSources)
                .TransitionTo(CleaningUp)
        );


    ...snip...


    private static bool IsAllDataImported(EventContext<DataImportSagaState, DataImportMappingCompletedEvent> ctx)
    {
        return ctx.Instance.Locations.Values.All(x => x);
    }

因此,当状态为ImportingData时,我希望收到多个DataImported事件。每个事件都将其位置标记为已完成,以便IsAllDataImported方法可以确定我们是否应该转换到下一个状态。

但是,如果最后两个DataImported事件同时到达,则转换到CleaningUp阶段的处理程序会触发两次,我最终会尝试执行两次清理。

可以用自己的代码解决这个问题,但我希望状态机能够管理这个问题。我做错了什么,还是我只需要自己处理争用?

4

2 回答 2

5

Chris 提出的解决方案不适用于我的情况,因为我有多个相同类型的事件到达。只有当所有这些事件都到达时,我才需要转换。CompositeEvent构造不适用于此用例。

我对此的解决方案是在MarkImportCompletedForLocation方法期间引发一个新的AllDataImported事件。此方法现在处理确定所有子导入是否以线程安全的方式完成。

所以我的状态机定义是:

            During(ImportingData,
            When(DataImported)
                // When we get a data imported event, mark the URI in the locations list as done. 
                .Then(MarkImportCompletedForLocation),

            When(AllDataImported)
                // Once all are done, we can transition to cleaning up...
                .TransitionTo(CleaningUp)
                .Then(CleanUpSources)
        );

IsAllDataImported方法不再需要作为过滤器。

传奇状态有一个 Locations 属性:

public Dictionary<Uri, bool> Locations { get; set; }

并且 MarkImportCompletedForLocation 方法定义如下:

    private void MarkImportCompletedForLocation(BehaviorContext<DataImportSagaState, DataImportedEvent> ctx)
    {
        lock (ctx.Instance.Locations)
        {
            ctx.Instance.Locations[ctx.Data.ImportSource] = true;
            if (ctx.Instance.Locations.Values.All(x => x))
            {
                var allDataImported = new AllDataImportedEvent {CorrelationId = ctx.Instance.CorrelationId};
                this.CreateEventLift(AllDataImported).Raise(ctx.Instance, allDataImported);
            }
        }
    }

(我刚刚写了这篇文章,以便了解一般流程将如何工作;我认识到 MarkImportCompletedForLocation 方法需要通过验证字典中是否存在键来更具防御性。)

于 2016-04-18T21:28:01.233 回答
1

您可以使用复合事件将多个事件累积到一个后续事件中,该事件在相关事件触发时触发。这是使用定义的:

CompositeEvent(() => AllDataImported, x => x.ImportStatus, DataImported, MoreDataImported);

During(ImportingData,
    When(DataImported)
        .Then(context => { do something with data }),
    When(MoreDataImported)
        .Then(context => { do smoething with more data}),
    When(AllDataImported)
        .Then(context => { okay, have all data now}));

然后,在您的状态机状态实例中:

class DataImportSagaState :
    SagaStateMachineInstance
{
    public int ImportStatus { get; set; }
}

这应该可以解决您要解决的问题,因此请试一试。请注意,事件顺序无关紧要,它们可以按任何顺序到达,因为已接收事件的状态位于实例的 ImportStatus 属性中。

单个事件的数据不会保存,因此您需要自己使用.Then()方法将其捕获到状态实例中。

于 2016-04-18T04:31:28.643 回答