0

我试图弄清楚为什么“发送”消息不会调用状态机,但是如果我“发布”消息,它会起作用,并且我可以看到状态发生变化。

以下是我的代码,它类似于文档,除了我试图“发送”一条消息。

组件

状态机:
public class OrderState: SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public int CurrentState { get; set; }
    public DateTime? OrderDate { get; set; }
}

public class OrderStateMachine: MassTransitStateMachine<OrderState>
{
    public State Submitted { get; private set; }
    public State Accepted { get; private set; }
    public State Completed { get; private set; }

    public Event<SubmitOrder> SubmitOrder { get; private set; }
    public Event<OrderAccepted> OrderAccepted { get; private set; }
    public Event<OrderCompleted> OrderCompleted { get; private set; }

    public OrderStateMachine()
    {
        InstanceState(x => x.CurrentState, Submitted, Accepted, Completed);
        Event(() => SubmitOrder, x => x.CorrelateById(context => context.Message.OrderId));
        Event(() => OrderAccepted, x => x.CorrelateById(context => context.Message.OrderId));
        Event(() => OrderCompleted, x => x.CorrelateById(context => context.Message.OrderId));

        Initially(
            When(SubmitOrder)
                .Then(context => context.Instance.OrderDate = context.Data.OrderDate)
                .TransitionTo(Submitted));

        During(Submitted,
            When(OrderAccepted)
                .TransitionTo(Accepted));
        
        During(Accepted,
            Ignore(SubmitOrder));

        DuringAny(
            When(OrderCompleted)
                .TransitionTo(Completed));
        
        SetCompleted(async instance =>
        {
            var currentState = await this.GetState(instance);
            return Completed.Equals(currentState);
        });
    }
}
合同:
public record SubmitOrder(Guid OrderId, DateTime? OrderDate);
public record OrderAccepted(Guid OrderId);
public record OrderCompleted(Guid OrderId);
消费者:
public class SubmitOrderConsumer: IConsumer<SubmitOrder>
{
    public async Task Consume(ConsumeContext<SubmitOrder> context)
    {
        await Task.Delay(2000);
    }
}

public class SubmitOrderConsumerDefinition : ConsumerDefinition<SubmitOrderConsumer>
{
    public SubmitOrderConsumerDefinition()
    {
        EndpointName = "submit-order";
    }
    
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<SubmitOrderConsumer> consumerConfigurator)
    {
        endpointConfigurator.ConfigureConsumeTopology = false;
    }
}

网络 API

Program.cs(片段)
// Add services to the container.
builder.Services.AddMassTransit(cfg =>
{
    cfg.SetKebabCaseEndpointNameFormatter();
    cfg.UsingRabbitMq((context, configurator) =>
    {
        configurator.Host("localhost", "/", hostConfigurator =>
        {
            hostConfigurator.Username("guest");
            hostConfigurator.Password("guest");
        });
    });
});
builder.Services.AddMassTransitHostedService();
builder.Services.AddControllers();

订单控制器
[Route("order")]
public class OrderController : ControllerBase
{
    private readonly ISendEndpointProvider _sendEndpointProvider;
    public OrderController(ISendEndpointProvider sendEndpointProvider)
    {
        _sendEndpointProvider = sendEndpointProvider;
    }
    
    [HttpPost]
    public async Task<IActionResult> SendOrder()
    {
        var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("exchange:submit-order"));
        await endpoint.Send(new SubmitOrder(Guid.NewGuid(), DateTime.Now));
        return Ok();
    }
}

工人服务

程序.cs
using IHost = Microsoft.Extensions.Hosting.IHost;

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        services.AddMassTransit(cfg =>
        {
            cfg.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition));
            cfg.AddSagaStateMachine<OrderStateMachine, OrderState>().InMemoryRepository();
            cfg.UsingRabbitMq((context, rabbitMqConfigurator) =>
            {
                rabbitMqConfigurator.Host("localhost", "/", hostConfigurator =>
                {
                    hostConfigurator.Username("guest");
                    hostConfigurator.Password("guest");
                });
                rabbitMqConfigurator.ReceiveEndpoint("saga-order", endpointConfigurator =>
                {
                    endpointConfigurator.ConfigureSaga<OrderState>(context);
                });
                rabbitMqConfigurator.ConfigureEndpoints(context);
            });
        });
        services.AddMassTransitHostedService();
        services.AddHostedService<Worker>();
    })
    .Build();

await host.RunAsync();

然后我通过邮递员做一个 POST 到:http://localhost:5000/order

It does call the SubmitOrderConsumer, but for some reason, the State machine does not get invoked (it won't hit breakpoint inside the Then handler that sets the Order Date inside Initially state.). I think I am missing something that connects the two together. Any feedback is greatly appreciated. Thank you.

4

1 回答 1

0

In your example, you'd want to use Publish, especially in this scenario where you have two consumers (the consumer, and the state machine) on separate endpoints (queue) that would be consuming the message. Sending directly to the exchange would only get the message to one of the endpoints.

于 2022-02-09T16:08:41.280 回答