1

我一直在转动我的轮子试图让 MassTransitStateMachine 工作,我似乎不明白这应该如何工作。

我收到的错误(下面的完整代码)是PayloadNotFoundException当我尝试stateMachine.RaiseEvent(instance, stateMachine.DeployApplicationRequest, request)结合.Send()状态机中的 Activity 调用时。我一直试图追查这一点,但一无所获。

以下是相关代码和意图解释:

我收到通过 webAPI 帖子部署应用程序的请求,并将请求转换为DeployApplicationRequest.

public record DeployApplicationRequest
    {
        // State machine properties
        public Guid CorrelationId { get; init; }
        public ChatApplication ChatApplication { get; init; }
        public string ChannelId { get; init; }
        public string UserId { get; init; }


        // Command
        public DeployApplication DeployApplication { get; init; }
    }

    public enum ChatApplication
    {
        Slack,
        Teams
    }

这个请求封装了两件事:API 需要维护的状态,以便它知道如何将结果路由回请求者(状态机属性)以及Command要发送到服务总线的状态。

命令 ,DeployApplication如下所示:

public record DeployApplication
    {
        public Guid CorrelationId { get; init;}
        public string InitiatedBy { get; init; }
        public DateTime CreatedDate { get; init; }
        public string Environment { get; init; }
        public string PullRequestId { get; init; }
    }

我创建了一个状态实例来保存请求的详细信息(例如它是通过 Slack 还是通过 Teams 进入的)。我不想将此信息发布到总线:

public class DeployApplicationState : SagaStateMachineInstance
    {
        public Guid CorrelationId { get; set; }
        public string CurrentState { get; set; }

        public ChatApplication ChatApplication { get; set; }
        public string ChannelId { get; set; }
        public string UserId { get; set; }
    }

我创建了一个StateMachine来处理这个:

public class DeployApplicationStateMachine : MassTransitStateMachine<DeployApplicationState>
    {
        private readonly ILogger<DeployApplicationStateMachine> logger;
        public DeployApplicationStateMachine(ILogger<DeployApplicationStateMachine> logger)
        {
            this.logger = logger;

            InstanceState(x => x.CurrentState);

            Event(() => DeployApplicationRequest, x => x.CorrelateById(context => context.Message.CorrelationId));

            Initially(
                When(DeployApplicationRequest)
                    .Then(x => {
                        x.Instance.CorrelationId = x.Data.CorrelationId;
                        x.Instance.ChannelId = x.Data.ChannelId;
                        x.Instance.ChatApplication = x.Data.ChatApplication;
                        x.Instance.UserId = x.Data.UserId;
                    })
                    .Send(context => context.Init<DeployApplication>(context.Data.DeployApplication))
                    .TransitionTo(Submitted));
        }

        public Event<DeployApplicationRequest> DeployApplicationRequest { get; private set; }

        public State Submitted { get; private set; }
    }

为了触发初始事件(因为请求不是通过消费者而是通过控制器进入的),我已经将状态机注入到 MassTransit 客户端,并且我正在调用该RaiseEvent方法:

public class MassTransitDeployClient : IDeployClient
    {
        private readonly DeployApplicationStateMachine stateMachine;
        public MassTransitDeployClient(DeployApplicationStateMachine stateMachine)
        {
            this.stateMachine = stateMachine;
        }

        public async Task Send(DeployApplicationRequest request)
        {
            var instance = new DeployApplicationState
            {
                CorrelationId = request.CorrelationId,
                ChannelId = request.ChannelId,
                ChatApplication = request.ChatApplication,
                UserId = request.UserId
            };

            await stateMachine.RaiseEvent(instance, stateMachine.DeployApplicationRequest, request);
            // This works for sending to the bus, but I lose the state information
            //await sendEndpointProvider.Send(request.DeployApplication);
        }
    }

容器配置如下:

services.AddMassTransit(x =>
            {
                x.AddSagaStateMachine<DeployApplicationStateMachine, DeployApplicationState>()
                    .InMemoryRepository();

                x.UsingInMemory((context, cfg) =>
                {
                    cfg.ConfigureEndpoints(context);
                });
                
                
            });

            EndpointConvention.Map<DeployApplication>(new Uri($"queue:{typeof(DeployApplication).FullName}"));

            services.AddMassTransitHostedService();

Submitted引发事件工作得很好,如果我不包括 .Send(...)在状态机中,状态机就会成功转移。我介绍的第二个我Activity得到了PayloadNotFoundException. 周末我尝试了大约 15 种不同的方法,但都没有成功,我希望有人能帮助我了解我的方法的错误。

谢谢阅读!顺便说一句,很棒的图书馆。我将寻找方法为这方面的发展做出贡献,这是我遇到的最有用的库之一(克里斯,你的 Youtube 视频非常棒)。

4

1 回答 1

1

Saga 状态机并不意味着直接从控制器调用。您应该发送(或发布)一条消息,然后该消息将通过消息代理发送到 saga。

如果您想这样做,您可以改用 MassTransit Mediator,但您仍将通过 Mediator 向 saga 发送消息,然后由 MassTransit 处理。

TL;DR - 你不使用RaiseEventsaga 状态机。

于 2021-06-21T13:23:57.657 回答