0

新手问题 - 我错过了什么?是否有可用的 dotnetcore 2.2 Saga 示例?

我有一个基本的端到端系统工作正常,消息在 docker-compose 中跨容器流动,但添加 Saga 似乎是一个挑战 -

问:我是否缺少调度程序依赖项?在 MassTransit 5.5.5 中,cfg.UseInMemoryMessageScheduler(); 不编译。

发生了一些奇怪的事情,我不得不将我的状态机明确标记为 ISaga

MassTransit.ConfigurationException:无法为 Model.WorkflowExecutionStateMachine 创建状态机连接器 ---> MassTransit.ConfigurationException:未正确配置状态机:workflowapi_1 | [失败] 未指定 ExecutingTask


    public void ConfigureServices(IServiceCollection services)
    {
        services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);

        // Register MassTransit
        services.AddMassTransit(x =>
        {
            x.AddConsumer<WorkflowTaskConsumer>();

            // required?
            x.AddSaga<WorkflowExecutionSaga>();

            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var rabbitMQHostName = $"rabbitmq://{configuration["RabbitMQHostName"]}";

                Console.Out.WriteLineAsync($"Starting Workflow Receiver... {rabbitMQHostName}/{QueueNames.ExeuteWorkflowTaskQueue}");

                var host = cfg.Host(new Uri(rabbitMQHostName), hostConfig =>
                {
                    hostConfig.Username("guest");
                    hostConfig.Password("guest");
                });

                // A basic message works OK
                cfg.ReceiveEndpoint(host, QueueNames.ExeuteWorkflowTaskQueue, ep =>
                {
                    ep.PrefetchCount = 1;
                    ep.UseMessageRetry(mr => mr.Interval(1000, 2));
                    ep.ConfigureConsumer<WorkflowTaskConsumer>(provider);
                });

                // Doesn't like this
                cfg.ReceiveEndpoint(host, QueueNames.WorkflowStateMachineSagaQueueName, ep =>
                {
                    ep.PrefetchCount = 1;
                    ep.UseMessageRetry(mr => mr.Interval(1000, 2));
                    ep.StateMachineSaga(new WorkflowExecutionSaga(), new InMemorySagaRepository<WorkflowExecutionStateMachine>());
                });
            }));

            cfg.UseInMemoryMessageScheduler(); // doesn't compile!
        });
    }

巴士启动如下 -


    public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }
        else
        {
            // The default HSTS value is 30 days. You may want to change this for production scenarios, 
            // see https://aka.ms/aspnetcore-hsts.
            app.UseHsts();
        }

        app.UseMvc();

        var bus = app.ApplicationServices.GetService<IBusControl>();
        var busHandle = TaskUtil.Await(() =>
        {
            return bus.StartAsync();
        });

        lifetime.ApplicationStopping.Register(() =>
        {
            busHandle.Stop();
        });
    }

异常详细信息是

未处理的异常:MassTransit.ConfigurationException:无法为 Rapid.Workflow.Api.Model.WorkflowExecutionStateMachine 创建状态机连接器 ---> MassTransit.ConfigurationException:未正确配置状态机:workflowapi_1 | [失败] ExecutingTask 未指定 workflowapi_1 | 在 Automatonymous.StateMachineConfigurationResult.CompileResults(IEnumerable 1 results) workflowapi_1 | at Automatonymous.StateMachineConnectors.StateMachineConnector1.StateMachineEvents()+MoveNext() workflowapi_1 | 在 System.Collections.Generic.List 1.AddEnumerable(IEnumerable1 可枚举) workflowapi_1 | 在 System.Linq.Enumerable.ToList[TSource](IEnumerable 1 source) workflowapi_1 | at Automatonymous.StateMachineConnectors.StateMachineConnector1..ctor(SagaStateMachine 1 stateMachine) workflowapi_1 | --- End of inner exception stack trace --- workflowapi_1 | at Automatonymous.StateMachineConnectors.StateMachineConnector1..ctor(SagaStateMachine 1 stateMachine) workflowapi_1 | at Automatonymous.SagaConfigurators.StateMachineSagaConfigurator1..ctor(SagaStateMachine1 stateMachine, ISagaRepository1 个存储库,ISagaConfigurationObserver 观察者)workflowapi_1 | 在 MassTransit.AutomatonymousReceiveEndpointExtensions.StateMachineSaga[TInstance](IReceiveEndpointConfigurator 配置器,SagaStateMachine 1 stateMachine, ISagaRepository1 存储库,Action`1 配置) workflowapi_1 | 在 Rapid.Workflow.Api.Startup.<>c.b__2_5(IRabbitMqReceiveEndpointConfigurator ep) 在 /src/Workflow.Api/Startup.cs:line 74

依赖项是

<PackageReference Include="Automatonymous" Version="4.1.6" />
<PackageReference Include="MassTransit" Version="5.5.5" />
<PackageReference Include="MassTransit.RabbitMQ" Version="5.5.5" />
<PackageReference Include="MassTransit.AspNetCore" Version="5.5.5" />
<PackageReference Include="MassTransit.Automatonymous" Version="5.5.5" /> 
<PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="5.5.5" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />

感谢您的任何提示或想法 -

4

2 回答 2

1

您需要更改使用.AddStateMachineSaga方法,而不是.AddSaga您在相关代码中使用的方法。

// required? - yes, but should be as shown here
x.AddSagaStateMachine<WorkflowExecutionSaga, WorkflowState>();

在这种情况下,状态机和状态机实例类型都是必需的。然后,在您的端点中,使用:

ep.ConfigureSaga<WorkflowState>(provider);

您还需要确保在容器中配置了 saga 存储库,这是为 MS DI/in-memory 使用的:

x.AddSingleton<ISagaRepository<WorkflowState>, InMemorySagaRepository<WorkflowState>>();

假设您的状态机没有损坏,那应该会让您滚动。如果您仍然收到错误,请确保您的所有状态机事件等都已正确配置。

此外,您的状态机实例应该实现:

public class WorkflowState :
    SagaStateMachineInstance

而且你的状态机不需要实现ISaga

public class WorkflowExecutionSaga :
    MassTransitStateMachine<WorkflowState>
于 2019-09-21T13:21:03.863 回答
0

这个错误似乎突然出现,因为 Saga 类已经声明了一些尚未使用(但公开)的事件 - DOH!

解决方案是从 Saga 中删除未使用的事件......

// uncomment will fail! public Event<ISatelliteTaskRequest> UnusedEvent { get; private set; }

在查看此示例https://github.com/selcukusta/masstransit-saga-implementation并将我的 program.cs 还原为基础之后 - 我仍然遇到错误!所以,不是容器/IOC/启动问题。

接下来,查看 MassTransit 错误消息的来源(https://github.com/MassTransit/MassTransit/blob/master/src/MassTransit.AutomatonymousIntegration/Configuration/StateMachineConnectors/StateMachineConnector.cs)我意识到相关代码可能正在反映在 Saga 的所有公共成员上 -

因此,从 Saga 类中删除未使用的事件可以解决此问题。

于 2019-09-21T13:34:46.777 回答