1

我们正在尝试使用 Saga 序列化业务对象列表的处理。

现在,在没有 Saga 的情况下,我们只需遍历对象列表,然后触发bus.Send(new ProcessBusinessObejct(obj))async 以执行处理程序。所以处理或多或少是并行发生的,取决于这个设置,我相信:

endpointConfiguration.LimitMessageProcessingConcurrencyTo( 4 );

这工作正常,但并发处理程序的数量现在很难在数据库上。

可以串联触发这些处理程序,即仅在当前进程完成(失败或成功)时继续下一个处理程序。我们不想将并发设置为 1,它会影响端点中的所有处理程序。

这个想法是使用 Scatter/Gather 模式和 Saga 来跟踪对象的数量并使用计数(总计数、失败计数、成功计数)更新状态机,最后在列表出现时触发事件完成/空。

问题是

A)我不确定如何跟踪传奇中的列表。SagaData 需要一个列表来保存所有对象吗?然后在处理程序发出信号完成处理时删除一个实例。 saga 不支持分层数据,因此不支持 List 或 List。我相信在 NSB v7 中仍然是这种情况。

B) 使用 saga 是可行的还是矫枉过正,还是有更简单的方法来实现这一点?

我们正在使用 Sql Server 持久性和传输以及 NSB 7。

非常感谢任何输入!

4

1 回答 1

1

我认为您正在寻求这样做。请注意,根据您使用的持久层,您可能需要将实际导入与更新 saga 状态分开。我在这里写过关于这个的博客。

Saga 数据也可以存储一个 List,但我认为在大多数情况下你可以摆脱计数。另一个重要的注意事项(尽管应该很明显)是,如果消息无法处理并进入错误队列(例如 ImportData 中的未捕获异常),则整个 saga 将不完整,直到该消息被重试和处理。

public class MySaga : Saga<MySagaData>
   : IAmStartedByMessages<StartTheProcess>,
     IHandleMessages<ImportData>,
     IHandleMessages<ImportFinished>
{
    public async Task Handle(StartTheProcess message, IMessageHandlerContext context)
    {
        Data.ObjectsToImport = message.ObjectCount;
        Data.JobID = Guid.NewGuid(); //To generate a correlation ID to connect future messages back to this saga instance

        foreach(var id in message.ObjectIdsToImport)
        {
            await context.SendLocal(new ImportData
            {
                JobID = Data.JobID //You need this to correlate messages back to the saga
                //Anything else you need to pass on to ImportData
                ObjectIdToImport = id
            }
        });
    }

    public async Task Handle(ImportData message, IMessageHandlerContext context)
    {
        //import the data and increment the counter
        var result = ImportData(message.ObjectIdToImport);
        if(result == Result.Success)
        {
            Data.SuccessImport++;
        }
        else
        {
            Data.FailedImport++;
        }

        await CheckIfFinished(context);
    }

    public async Task Handle(ImportFinished message, IMessageHandlerContext context)
    {
        //do any post cleanups or Mark as complete 
        MarkAsComplete();
        return Task.CompletedTask;
    }

    private async Task CheckIfFinished(IMessageHandlerContext context)
    {
        if(Data.SuccessImport + Data.FailedImport == Data.ObjectsToImport)
        {
            //Everything is done
            context.SendLocal(new ImportFinished { JobID = Data.JobID });
        }
    }
}
于 2018-09-07T01:14:25.273 回答