2

我正在尝试使用 EF Core 配置 Automatonymous worker 实现作为持久性。我通过 api 发布事件并使用 RabbitMq 作为传输在托管服务中处理它。不幸的是,数据库不存储机器状态。我应用了迁移并看到了 table OrderState,但是在发布OrderSubmmited事件后其中没有数据。它得到了正确处理,因为我在控制台中看到了一个日志,但数据库表仍然是空的。我错过了什么?

这是我的代码:

事件:

public interface OrderSubmitted : CorrelatedBy<Guid>
    {
    }

消费者:

public class OrderSubmittedConsumer : IConsumer<OrderSubmitted>
    {
        public Task Consume(ConsumeContext<OrderSubmitted> context)
        {
            Console.Out.WriteLine($"Order with id {context.Message.CorrelationId} has been submitted.");
            
            return Task.CompletedTask;
        }
    }

我的传奇实例:

public class OrderState : SagaStateMachineInstance
    {
        public Guid CorrelationId { get; set; }
        public string CurrentState { get; set; }
    }

状态机:

public class OrderStateMachine : MassTransitStateMachine<OrderState>
    {
        public State Submitted { get; private set; }

        public Event<OrderSubmitted> OrderSubmitted { get; set; }

        public OrderStateMachine()
        {
            Event(() => OrderSubmitted);
            
            InstanceState(x => x.CurrentState);
            
            Initially(
                When(OrderSubmitted)
                    .TransitionTo(Submitted));
            
            DuringAny(
                When(OrderSubmitted)
                    .TransitionTo(Submitted));
        }
    }
public class OrderStateMachineDefinition : SagaDefinition<OrderState>
    {
        public OrderStateMachineDefinition()
        {
            ConcurrentMessageLimit = 15;
        }
    }

数据库上下文:

public class OrderStateDbContext : SagaDbContext
    {
        public OrderStateDbContext(DbContextOptions options) : base(options)
        {
        }

        protected override IEnumerable<ISagaClassMap> Configurations
        {
            get
            {
                yield return new OrderStateMap();
            }
        }
    }
public class OrderStateMap : SagaClassMap<OrderState>
    {
        protected override void Configure(EntityTypeBuilder<OrderState> entity, ModelBuilder model)
        {
            entity.Property(x => x.CurrentState).HasMaxLength(64);
        }
    }

节目类:

public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureServices((hostContext, services) =>
                {
                    services.AddDbContext<OrderStateDbContext>(builder =>
                         builder.UseSqlServer("Server=(localdb)\\mssqllocaldb;Database=OrderState;Trusted_Connection=True;", m =>
                         {
                             m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
                             m.MigrationsHistoryTable($"__{nameof(OrderStateDbContext)}");
                         }));
                    
                    services.AddMassTransit(config =>
                    {
                        config.AddSagaRepository<OrderState>()
                            .EntityFrameworkRepository(r =>
                            {
                                r.ExistingDbContext<OrderStateDbContext>();
                                r.LockStatementProvider = new SqlServerLockStatementProvider();
                            });
                        
                        config.AddConsumer<OrderSubmittedConsumer>();

                        config.UsingRabbitMq((ctx, cfg) => {
                            cfg.Host("amqp://guest:guest@localhost:5672");
                            cfg.ReceiveEndpoint("order-queue", c => {
                                c.ConfigureConsumer<OrderSubmittedConsumer>(ctx);
                                // I'm assuming this is the place where something like c.StateMachineSaga() is missing, but I don't know how should this look like with EF Core
                            });
                        });
                    });
                    
                    services.AddHostedService<Worker>();
                });
    }

工人阶级:

public class Worker : IHostedService
    {
        private readonly IBusControl _bus;

        public Worker(IBusControl bus)
        {
            _bus = bus;
        }

        public async Task StartAsync(CancellationToken cancellationToken)
        {
            await _bus.StartAsync(cancellationToken).ConfigureAwait(false);
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            return _bus.StopAsync(cancellationToken);
        }
    }
4

1 回答 1

2

在您的代码示例中,您没有添加 saga,也没有在接收端点上配置 saga。消费者是一个独立的东西,与传奇完全无关。你应该打电话给:

AddSagaStateMachine<OrderStateMachine, OrderState, OrderStateMachineDefinition>()

然后使用ConfigureSaga<OrderState>或切换到ConfigureEndpoints自动配置端点。

于 2021-02-22T19:11:44.550 回答