我遇到了类似的问题 - 需要发布几十个命令(所有相同的界面,IMyRequest
)并等待所有。
实际上,我的命令启动了其他 saga,它们IMyRequestDone
在处理结束时发布,而不标记 saga 已完成。(需要稍后完成它们。)因此,我没有在父 saga 中保存已完成的嵌套 saga 的数量,而是只查询子 saga 实例的状态。
检查每条MyRequestDone
消息:
Schedule(() => FailSagaOnRequestsTimeout, x => x.CheckToken, x =>
{
// timeout for all requests
x.Delay = TimeSpan.FromMinutes(10);
x.Received = e => e.CorrelateById(context => context.Message.CorrelationId);
});
During(Active,
When(Xxx)
.ThenAsync(async context =>
{
await context.Publish(context => new MyRequestCommand(context.Instance, "foo"));
await context.Publish(context => new MyRequestCommand(context.Instance, "bar"));
context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow + FailSagaOnRequestsTimeout.Delay;
context.Instance.WaitingMyResponsesCount = 2;
})
.TransitionTo(WaitingMyResponses)
.Schedule(FailSagaOnRequestsTimeout, context => new FailSagaCommand(context.Instance))
);
During(WaitingMyResponses,
When(MyRequestDone)
.Then(context =>
{
if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow)
throw new TimeoutException();
})
.If(context =>
{
var db = serviceProvider.GetRequiredService<DbContext>();
var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList();
var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount &&
requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing)); // assume 3 states of request - Processing, Done and Failed
return allDone;
}, x => x
.Unschedule(FailSagaOnRequestsTimeout)
.TransitionTo(Active))
)
.Catch<TimeoutException>(x => x.TransitionTo(Failed))
);
During(WaitingMyResponses,
When(FailSagaOnRequestsTimeout.Received)
.TransitionTo(Failed)
定期检查所有请求是否完成(通过“减少 NServiceBus Saga 负载”):
Schedule(() => CheckAllRequestsDone, x => x.CheckToken, x =>
{
// check interval
x.Delay = TimeSpan.FromSeconds(15);
x.Received = e => e.CorrelateById(context => context.Message.CorrelationId);
});
During(Active,
When(Xxx)
.ThenAsync(async context =>
{
await context.Publish(context => new MyRequestCommand(context.Instance, "foo"));
await context.Publish(context => new MyRequestCommand(context.Instance, "bar"));
context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow.AddMinutes(10);
context.Instance.WaitingMyResponsesCount = 2;
})
.TransitionTo(WaitingMyResponses)
.Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance))
);
During(WaitingMyResponses,
When(CheckAllRequestsDone.Recieved)
.Then(context =>
{
var db = serviceProvider.GetRequiredService<DbContext>();
var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList();
var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount &&
requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing));
if (!allDone)
{
if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow + CheckAllRequestsDone.Delay)
throw new TimeoutException();
throw new NotAllDoneException();
}
})
.TransitionTo(Active)
.Catch<NotAllDoneException>(x => x.Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance)))
.Catch<TimeoutException>(x => x.TransitionTo(Failed));