44

Jimmy Boagard在这里描述了一家麦当劳快餐连锁店,将其与 分散聚集模式进行了比较。

从上面的文章中窃取的工作流程图像:在此处输入图像描述

初步实施思路:

为所有食品站将获得的所有类型的 FoodOrdered 事件提供一个通用接口,然后每个食品站将能够消费/创建其各自的项目并发布一个共同的完成事件。例如:薯条和汉堡站收到一条有关薯条订单的消息,薯条站消费该订单并宣布 saga 正在侦听的 ItemDoneEvent。

最初的担忧:

由于 Saga 并不关心完成的食物类型,只是所有食物都已完成这一事实,这似乎是一个不错的解决方案。但是,在阅读此处有关共享队列的警告并注意到使用 MassTransit 3.0 删除了 Consumer.Conditional 过滤之后,感觉框架好像在说这种方法会发生“坏事(TM)”。但是我不确定如果不为厨房中的每个食品创建消息请求和响应以及关联事件,您还能如何做到这一点。例如:FriesOrdered、BurgerOrdered FriesCooked、BurgerCooked。如果您必须对厨房中的每件物品都这样做,这会非常乏味吗?

考虑到上述问题 - 这种类型的工作流的一个很好的传奇示例会是什么样子?

4

3 回答 3

1

您不能“简单地”将对象作为事件参数传递到队列中吗?当 saga 侦听器收到“订单已完成”事件时,它会包含事件中已完成的对象吗?

我想它是通过通用方法发送到队列的,其中对象必须实现 IFoodOrdered

然后你可以实现一个虚拟方法,当 saga 被选中时,它可以用来做“通用”的事情,你只需要为那些特殊的项目实现重载,这需要一些特殊的事情发生吗?

于 2017-12-21T11:19:26.390 回答
1

我遇到了类似的问题 - 需要发布几十个命令(所有相同的界面,IMyRequest)并等待所有。

实际上,我的命令启动了其他 saga,它们IMyRequestDone在处理结束时发布,而不标记 saga 已完成。(需要稍后完成它们。)因此,我没有在父 saga 中保存已完成的嵌套 saga 的数量,而是只查询子 saga 实例的状态。

检查每条MyRequestDone消息:

Schedule(() => FailSagaOnRequestsTimeout, x => x.CheckToken, x =>
{
    // timeout for all requests
    x.Delay = TimeSpan.FromMinutes(10);
    x.Received = e => e.CorrelateById(context => context.Message.CorrelationId);
});


During(Active,
    When(Xxx)
        .ThenAsync(async context =>
        {
            await context.Publish(context => new MyRequestCommand(context.Instance, "foo"));
            await context.Publish(context => new MyRequestCommand(context.Instance, "bar"));

            context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow + FailSagaOnRequestsTimeout.Delay;
            context.Instance.WaitingMyResponsesCount = 2;
        })
        .TransitionTo(WaitingMyResponses)
        .Schedule(FailSagaOnRequestsTimeout, context => new FailSagaCommand(context.Instance))
    );

During(WaitingMyResponses,
    When(MyRequestDone)
        .Then(context =>
        {
            if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow)
                throw new TimeoutException();
        })
        .If(context =>
        {
            var db = serviceProvider.GetRequiredService<DbContext>();
            var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList();
            var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount &&
                requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing)); // assume 3 states of request - Processing, Done and Failed
            return allDone;
        }, x => x
            .Unschedule(FailSagaOnRequestsTimeout)
            .TransitionTo(Active))
        )
        .Catch<TimeoutException>(x => x.TransitionTo(Failed))
);

During(WaitingMyResponses,
    When(FailSagaOnRequestsTimeout.Received)
        .TransitionTo(Failed)

定期检查所有请求是否完成(通过“减少 NServiceBus Saga 负载”):

Schedule(() => CheckAllRequestsDone, x => x.CheckToken, x =>
{
    // check interval
    x.Delay = TimeSpan.FromSeconds(15);
    x.Received = e => e.CorrelateById(context => context.Message.CorrelationId);
});

During(Active,
    When(Xxx)
        .ThenAsync(async context =>
        {
            await context.Publish(context => new MyRequestCommand(context.Instance, "foo"));
            await context.Publish(context => new MyRequestCommand(context.Instance, "bar"));

            context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow.AddMinutes(10);
            context.Instance.WaitingMyResponsesCount = 2;
        })
        .TransitionTo(WaitingMyResponses)
        .Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance))
    );

During(WaitingMyResponses,
    When(CheckAllRequestsDone.Recieved)
        .Then(context =>
        {
            var db = serviceProvider.GetRequiredService<DbContext>();
            var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList();
            var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount &&
                requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing));
            if (!allDone)           
            {
                if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow + CheckAllRequestsDone.Delay)              
                    throw new TimeoutException();
                throw new NotAllDoneException();
            }
        })
        .TransitionTo(Active)
        .Catch<NotAllDoneException>(x => x.Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance)))
        .Catch<TimeoutException>(x => x.TransitionTo(Failed));
于 2017-12-02T20:44:47.607 回答
-1

将完成的事件退回到 saga 的问题在于它会在共享资源(即 saga 状态)上产生争用。

Jim 在您引用的那篇文章之后发表了另一篇文章,概述了问题和解决方案。当然,他具体说的是NServiceBus,但是问题和概念是一样的。

https://lostechies.com/jimmybogard/2014/02/27/reducing-nservicebus-saga-load/

创建外部存储。为每个工作项输入一个记录。让每个工作人员将自己的工作设置为完成,而 saga 使用延迟消息有效地轮询以查看所有工作是否已完成。

然后你仍然在做分散收集,但是“聚合器”已经被进程管理器模式取代以减少争用。

于 2017-09-28T17:13:03.673 回答