3

我正在使用基于https://github.com/MassTransit/Sample-ShoppingWeb的 MassTransitStateMachine 示例。如果我只运行一个状态机应用程序,一切正常。但是,当我运行多个状态机应用程序实例时,似乎错误的状态机应用程序正在处理来自消费者的消息。

即StateMachineA 向ConsumerA 发送命令。ConsumerA 消费和发布的事件。该事件没有路由到 StateMachineA,而是被发送到 StateMachineB。

我想在多个容器中部署状态机,但我无法通过这个。

我试图用谷歌搜索这个主题,但似乎很难找到关于 MassTransitStateMachine 的示例或讨论。或者也许我没有足够地阅读文档。

命令和事件

public interface IMessage
{
    Guid CorrelationId { get; }
    PricingSpec PricingSpec { get; }
}

public interface IPricingRequested : IMessage
{
}

public interface ISubPricingRequest : IMessage
{
}

public interface ISubPricingProcessed : IMessage
{
}

public interface IPricingProcessed : IMessage
{
}

客户

class Program
{
    private static bool _continueRunning = true;

    static void Main(string[] args)
    {
        Log.Logger = new LoggerConfiguration()
            .MinimumLevel.Debug()
            .MinimumLevel.Override("MassTransit", LogEventLevel.Warning)
            .Enrich.FromLogContext()
            .WriteTo.Console(new CompactJsonFormatter())
            .CreateLogger();

        Console.CancelKeyPress += Console_CancelKeyPress;
        var bus = CreateBus();
        Console.WriteLine("Starting Pricing Requester");
        Console.ReadLine();
        while(_continueRunning)
        {
            PricingSpec pricingSpec = new PricingSpec()
            {
                Symbol = $"Symbol{DateTime.Now.ToString("yyyymmddhhmmss")}",
                Underlyings = new List<string>() { "AOT" },
            };

            bus.Publish<IPricingRequested>(new { CorrelationId = Guid.NewGuid(), PricingSpec = pricingSpec });
            Console.WriteLine(pricingSpec.ToString());
            Console.ReadLine();
        }
    }

    private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
    {
        e.Cancel = true;
        _continueRunning = false;
    }

    private static IBus CreateBus()
    {
        var rabbitHost = new Uri("rabbitmq://localhost:5672/saga");
        var user = "guest";
        var password = "guest";
        var inputQueue = "pricing-requester";
        var bus = Bus.Factory.CreateUsingRabbitMq(configurator =>
        {
            var host = configurator.Host(rabbitHost, h =>
            {
                h.Username(user);
                h.Password(password);
            });

            configurator.ReceiveEndpoint(host, inputQueue, c =>
            {
                c.Consumer(() => new PricingProcessedConsumer());
            });
        });

        TaskUtil.Await(() => bus.StartAsync());
        return bus;
    }
}

状态机

public class AutocallablePricingStateMachine : MassTransitStateMachine<AutocallablePricingState>
{
    public AutocallablePricingStateMachine()
    {
        InstanceState(x => x.CurrentState);
        this.Event(() => this.PricingRequested, x => x.CorrelateById(c => c.Message.CorrelationId).SelectId(c => c.Message.CorrelationId));
        this.Event(() => this.SubPricingProcessed, x => x.CorrelateById(c => c.Message.CorrelationId));

        Initially(
            When(PricingRequested)
            .Then(context =>
            {
                this.UpdateSagaState(context.Instance, context.Data.PricingSpec);
            })
            .Then(InterceptPricingRequested)
            .ThenAsync(context => this.SendCommand<ISubPricingRequest>("sub-pricer", context))
            .TransitionTo(Processing));

        During(Processing,
            When(SubPricingProcessed)
            .Then(context =>
            {
                InterceptSubPricingProcessed(context);
            })
            .Publish(context => new PricingProcessed(context.Data.CorrelationId, context.Data.PricingSpec))
            .Finalize());

        SetCompletedWhenFinalized();
    }

    private void InterceptPricingRequested(BehaviorContext<AutocallablePricingState, IPricingRequested> obj)
    {
        Console.WriteLine($"Sending ISubPricingRequest Command Correlation {obj.Data.CorrelationId}");
    }

    private void InterceptSubPricingProcessed(BehaviorContext<AutocallablePricingState, ISubPricingProcessed> obj)
    {
        Console.WriteLine($"Receiving ISubPricingProcessed Event Correlation {obj.Data.CorrelationId}");
    }

    private void UpdateSagaState(AutocallablePricingState state, PricingSpec pricingSpec)
    {
        var currentDate = DateTime.Now;
        state.PricingSpec = pricingSpec;
    }

    private async Task SendCommand<TCommand>(string endpointKey, BehaviorContext<AutocallablePricingState, IMessage> context)
        where TCommand : class, IMessage
    {
        var sendEndPoint = await context.GetSendEndpoint(new Uri($"rabbitmq://localhost:5672/saga/{endpointKey}"));            
        await sendEndPoint.Send<TCommand>(new
        {
            CorrelationId = context.Data.CorrelationId,
            PricingSpec = context.Data.PricingSpec,
        });
    }

    public SagaState Processing { get; private set; }
    public Event<IPricingRequested> PricingRequested { get; private set; }
    public Event<ISubPricingProcessed> SubPricingProcessed { get; private set; }
}

public class AutocallablePricingState : SagaStateMachineInstance
{
    public AutocallablePricingState(Guid correlationId)
    {
        this.CorrelationId = correlationId;
    }

    public string CurrentState { get; set; }
    public PricingSpec PricingSpec { get; set; }
    public Guid CorrelationId { get; set; }
}

消费者

public class SubPricingRequestConsumer : IConsumer<ISubPricingRequest>
{
    public async Task Consume(ConsumeContext<ISubPricingRequest> context)
    {
        Console.WriteLine($"Sub Pricing for symbol {context.Message.PricingSpec.Symbol}");
        await Task.Delay(2000);
        this.UpdatePricingSpec(context.Message.PricingSpec);
        await context.Publish<ISubPricingProcessed>(new
        {
            CorrelationId = context.Message.CorrelationId,
            PricingSpec = context.Message.PricingSpec,
        });
    }

    private void UpdatePricingSpec(PricingSpec pricingSpec)
    {
        Random random = new Random();
        double premium = random.Next();
        Console.WriteLine($"Sub Pricing for symbol {pricingSpec.Symbol}, Premium {premium}");
        pricingSpec.Premium = premium;
    }
}

我预计如果我启动多个 AutocallablePricingStateMachine 应用程序,它将消耗来自消费者的事件,该事件处理了由同一个 AutocallablePricingMachine 应用程序(或 pod/容器)发送的命令。

4

0 回答 0