4

我觉得我已经接近完成这项工作,但似乎无法完成...

我有一个带有传奇/状态机的 .NET Core ASP 应用程序,它在大多数情况下似乎运行良好。它:

  1. 收到请求
  2. 发布由传送单选取的事件
  3. 当传送单完成时,它会发布一个事件
  4. 事件拾起saga
  5. 然后 saga 向请求/响应消费者发送一个请求
  6. 回应回到传奇
  7. 但是然后我使用 RespondAsync 尝试将响应发送回原始调用控制器,但是没有任何东西可以返回

我的控制器看起来像:

private readonly IRequestClient<IRequestLink, IRequestLinkCompleted> _client;

public async Task<IActionResult> CreateLinkAsync([FromBody]CreateLinkRequest request)
{
        var requestLink = new RequestLink            {

            GroupName = $"{request.Name} Group",
            Name = request.Name,
            LinkId = Guid.NewGuid()
        };

        var result = await _client.Request(requestLink).ConfigureAwait(false);

        return Ok();
}

我的传奇故事的精简版如下所示:

Request(() => LinkRequest, x => x.RequestId, cfg =>
        {
            cfg.ServiceAddress = new Uri($"rabbitmq://localhost/request_end_point_name");
            cfg.SchedulingServiceAddress = new Uri($"rabbitmq://localhost/request_end_point_name");
            cfg.Timeout = TimeSpan.FromSeconds(30);
        });


During(RequestReceived,
                When(LinkCreatedEvent)
                    .Request(LinkRequest, context => new SelectUrlByPublicId(context.Data.DatabaseId, context.Data.LinkId))
                    .TransitionTo(LinkRequest.Pending));

            During(LinkRequest.Pending,                
                When(LinkRequest.Completed)
                    .ThenAsync(context => context.RespondAsync(new RequestLinkCompleted
                    {
                        CorrelationId = LinkRequest.GetRequestId(context.Instance),
                        DatabaseId = context.Data.DatabaseId
                    }))
                    .Finalize());

最后在我的启动代码中,我将请求/响应配置为:

services.AddScoped<IRequestClient<IRequestLink, IRequestLinkCompleted>>(x => new MessageRequestClient<IRequestLink, IRequestLinkCompleted>(_bus, new Uri($"{messageBusSettings.Host}/create_link_saga"), TimeSpan.FromSeconds(30)));

我猜 RespondAsync 调用没有使用正确/原始的 requestId,但我不知道如何检查或更改它。谁能帮忙?

4

2 回答 2

3

由于原始请求的上下文丢失了,您基本上需要做自己正在做的RespondAsync事情。

  1. ResponseAddress从请求消息上下文中保存
  2. RequestId从相同的上下文中保存

在您的传奇中,当需要响应时,您需要在委托中使用context.GetSendEndpoint(context.Instance.SavedResponseAddress)然后调用Send设置RequestId以匹配原始上下文中保存的 RequestId。

现在,您可能需要将这些保存在路由滑动变量中,因为您的 saga 没有收到命令,只有后续事件,但最终效果是一样的,原始请求消息已经消失,并且 saga 从未看到过。

于 2018-07-30T12:26:06.137 回答
2

对于那些希望更清楚地了解一些代码片段以使其正常工作的人,这是我们为启动和运行 saga 所做的工作,该 saga 将响应返回给初始请求者。我将上面的答案标记为正确,但我们仍然需要几个小时才能让我们的实现工作,因为我们对 sagas 还很陌生,无法弄清楚在哪里可以访问原始上下文,而不仅仅是我们的消息,因为我们实现传奇结构的方式。希望这可以为其他人节省一些时间。

初始 Saga 设置:

 public PaymentStateMachine()
        {
            InstanceState(x => x.CurrentState);

            this.ConfigureCorrelationIds();
.....
//Saga workflow registration
.....

            this.During(ValidationSucceeded
                , SetPaymentGatewaySubmissionHandler());            

            SetCompletedWhenFinalized();
        }

我们必须注册我们的初始事件,以便将初始消息中的一些信息保存到 saga 表中。需要注意的关键部分是将context.responseAddresscontext.RequestId保存到 Saga Factory 中。

private void ConfigureCorrelationIds()
{
Event(() => PaymentSubmittedEvent,
                x =>
                {

                    x.CorrelateBy<int>(pay => pay.OrderId, context => context.Message.Order.Id)
                    .SelectId(c => c.Message.CorrelationId);

                    x.InsertOnInitial = true;

                    x.SetSagaFactory(context => new PaymentSagaState()
                    {
                        ResponseAddress = context.ResponseAddress.ToString(),
                        CorrelationId = context.CorrelationId.Value,
                        RequestId = context.RequestId,
                        OrderId = context.Message.Order.Id,
                        Created = DateTime.Now,
                        Updated = DateTime.Now,
                        CurrentState = Initial.Name
                    });
                });
....
//Other events registered
}

从那里开始,当我们处理完 saga 后,我们使用上面标记的答案将响应发送回原始消息的响应地址。新的PaymentSubmittedResponse只是客户端在前端等待的消息合约传奇中的一个私有实现。

private EventActivityBinder<PaymentSagaState, IPaymentGatewaySubmittedEvent> SetPaymentGatewaySubmissionHandler() =>
            When(PaymentGatewaySubmittedEvent)
                .Then(c => this.UpdateSagaState(c, PaymentGatewayComplete.Name))
                .Then(c => Console.Out.WriteLineAsync(
                    $"{DateTime.Now} PaymentGatewayComplete: {c.Data.Status} to {c.Instance.CorrelationId}"))
                .ThenAsync(async c =>
                {              
                    //Send response back to orignial requestor once we are done with this step               
                    var responseEndpoint =
                            await c.GetSendEndpoint(new Uri(c.Instance.ResponseAddress));

                    await responseEndpoint.Send(new PaymentSubmittedResponse(c.Data), callback: sendContext => sendContext.RequestId = c.Instance.RequestId);
                })
            .TransitionTo(ClientPaymentSubmissionResponded);

private void UpdateSagaState(BehaviorContext<PaymentSagaState> sagaContext, string status)
        {
            sagaContext.Instance.CurrentState = status;
            sagaContext.Instance.Updated = DateTime.Now;
        }

private class PaymentSubmittedResponse : IPaymentSubmittedEvent
        {
            public string Status { get; set; }

            public OrderDto Order { get; set; }

            public Guid CorrelationId { get; set; }
            public DateTime TimeStamp { get; set; }

            public PaymentSubmittedResponse(IPaymentBase paymentMessage)
            {
                Order = paymentMessage.Order;
                CorrelationId = paymentMessage.CorrelationId;
                Status = paymentMessage.Status;
                TimeStamp = paymentMessage.TimeStamp;
            }
        }

不确定它是完全需要还是仅仅因为我们如何实现它,但我们不得不引入ClientPaymentSubmissionResponded的另一种传奇状态来处理发送回原始请求的响应消息事件。

于 2019-06-06T13:46:31.557 回答