我正在尝试使用 EF Core 配置 Automatonymous worker 实现作为持久性。我通过 api 发布事件并使用 RabbitMq 作为传输在托管服务中处理它。不幸的是,数据库不存储机器状态。我应用了迁移并看到了 table OrderState
,但是在发布OrderSubmmited
事件后其中没有数据。它得到了正确处理,因为我在控制台中看到了一个日志,但数据库表仍然是空的。我错过了什么?
这是我的代码:
事件:
public interface OrderSubmitted : CorrelatedBy<Guid>
{
}
消费者:
public class OrderSubmittedConsumer : IConsumer<OrderSubmitted>
{
public Task Consume(ConsumeContext<OrderSubmitted> context)
{
Console.Out.WriteLine($"Order with id {context.Message.CorrelationId} has been submitted.");
return Task.CompletedTask;
}
}
我的传奇实例:
public class OrderState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
}
状态机:
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public State Submitted { get; private set; }
public Event<OrderSubmitted> OrderSubmitted { get; set; }
public OrderStateMachine()
{
Event(() => OrderSubmitted);
InstanceState(x => x.CurrentState);
Initially(
When(OrderSubmitted)
.TransitionTo(Submitted));
DuringAny(
When(OrderSubmitted)
.TransitionTo(Submitted));
}
}
public class OrderStateMachineDefinition : SagaDefinition<OrderState>
{
public OrderStateMachineDefinition()
{
ConcurrentMessageLimit = 15;
}
}
数据库上下文:
public class OrderStateDbContext : SagaDbContext
{
public OrderStateDbContext(DbContextOptions options) : base(options)
{
}
protected override IEnumerable<ISagaClassMap> Configurations
{
get
{
yield return new OrderStateMap();
}
}
}
public class OrderStateMap : SagaClassMap<OrderState>
{
protected override void Configure(EntityTypeBuilder<OrderState> entity, ModelBuilder model)
{
entity.Property(x => x.CurrentState).HasMaxLength(64);
}
}
节目类:
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddDbContext<OrderStateDbContext>(builder =>
builder.UseSqlServer("Server=(localdb)\\mssqllocaldb;Database=OrderState;Trusted_Connection=True;", m =>
{
m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
m.MigrationsHistoryTable($"__{nameof(OrderStateDbContext)}");
}));
services.AddMassTransit(config =>
{
config.AddSagaRepository<OrderState>()
.EntityFrameworkRepository(r =>
{
r.ExistingDbContext<OrderStateDbContext>();
r.LockStatementProvider = new SqlServerLockStatementProvider();
});
config.AddConsumer<OrderSubmittedConsumer>();
config.UsingRabbitMq((ctx, cfg) => {
cfg.Host("amqp://guest:guest@localhost:5672");
cfg.ReceiveEndpoint("order-queue", c => {
c.ConfigureConsumer<OrderSubmittedConsumer>(ctx);
// I'm assuming this is the place where something like c.StateMachineSaga() is missing, but I don't know how should this look like with EF Core
});
});
});
services.AddHostedService<Worker>();
});
}
工人阶级:
public class Worker : IHostedService
{
private readonly IBusControl _bus;
public Worker(IBusControl bus)
{
_bus = bus;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _bus.StartAsync(cancellationToken).ConfigureAwait(false);
}
public Task StopAsync(CancellationToken cancellationToken)
{
return _bus.StopAsync(cancellationToken);
}
}