5

如果我有很多活动,是否会导致资源阻塞或请求超时?

这是我的场景:

我有一个向消费者发送订单请求的 api 控制器;我使用 Request/Response 模式从消费者那里接收ErrorMessage属性,并基于该属性响应返回,如果它为 null 我想返回OK(),否则返回BadRequestOk但带有如下消息:Product out of stock to notify to the client

在我的消费者中,我建立了一个包含 2 个活动的路由单:

  • CreateOrderActivity:创建带有订单详细信息的订单。
  • ReserveProductActivity:这减少了库存产品的数量,if product quantity < 0我将向消费者发布一条带有ErrorMessage的消息并补偿之前的活动。

    public async Task Consume(ConsumeContext<ProcessOrder> context)
    {
        try
        {
            if (!string.IsNullOrEmpty(context.Message.ErrorMessage))
            {
                await context.RespondAsync<OrderSubmitted>(new
                {
                    context.Message.OrderId,
                    context.Message.ErrorMessage
                });
    
                return;
            }
    
            RoutingSlipBuilder builder = new RoutingSlipBuilder(context.Message.OrderId);
            // get configs
            var settings = new Settings(_configuration);
    
            // Add activities
            builder.AddActivity(settings.CreateOrderActivityName, settings.CreateOrderExecuteAddress);
            builder.SetVariables(new { context.Message.OrderId, context.Message.Address, context.Message.CreatedDate, context.Message.OrderDetails });
    
            builder.AddActivity(settings.ReserveProductActivityName, settings.ReserveProductExecuteAddress);
            builder.SetVariables(new { context.Message.OrderDetails });
    
    
            await context.Execute(builder.Build());
    
            await context.RespondAsync<OrderSubmitted>(new
            {
                context.Message.OrderId
            });
    
        }
        catch (Exception ex)
        {
            _log.LogError("Can not create Order {OrderId}", context.Message.OrderId);
            throw new Exception(ex.Message);
        }
    }
    

ReserveProductActivity 的代码:

    public async Task<ExecutionResult> Execute(ExecuteContext<ReserveProductArguments> context)
    {
        var orderDetails = context.Arguments.OrderDetails;

        foreach (var orderDetail in orderDetails)
        {
            var product = await _productRepository.GetByProductId(orderDetail.ProductId);
            if (product == null) continue;

            var quantity = product.SetQuantity(product.QuantityInStock - orderDetail.Quantity);


            if (quantity < 0)
            {
                var errorMessage = "Out of stock.";
                await context.Publish<ProcessOrder>(new
                {
                    ErrorMessage = errorMessage
                });
                throw new RoutingSlipException(errorMessage);
            }

            await _productRepository.Update(product);
        }

        return context.Completed(new Log(orderDetails.Select(x => x.ProductId).ToList()));
    }

消费者方法中的这行代码await context.Execute(builder.Build())

起初我认为它会在进入下一行之前先构建路由单并执行所有活动,但事实并非如此。相反,它会立即转到下一行代码(响应回控制器),然后在执行活动之后,这不是我想要的。我需要先检查第二个活动中的产品数量,然后再根据返回给控制器。

(目前,它总是首先响应控制器 - 之后的行buider.Buid(),然后如果quantity < 0它仍然进入第一个 if 条件的消费方法,但由于它已经响应,我无法再次触发该 if 语句内的响应)。

所以简而言之,如果产品在第二个活动中仍然可用,我可以像往常一样发回响应(它在之后执行代码context.Execute(builder.Build()),但是如果- 我使用ErrorMessagequantity < 0发布回消费者方法,我希望它跳转到非常首先如果 Consume 方法 ( ) 的条件和基于ErrorMessage通知客户端。if(!string.IsNullOrEmpty(context.Message.ErrorMessage)) ...

这种方法有问题吗?我怎样才能实现这样的目标?

谢谢

4

2 回答 2

1

它没有记录,但可以使用代理来执行路由单,并使用路由单的结果响应请求。您可以在单元测试中查看详细信息:

https://github.com/MassTransit/MassTransit/blob/master/tests/MassTransit.Tests/Courier/RequestRoutingSlip_Specs.cs#L20

您可以创建构建路由单并执行它的代理,以及响应代理 - 然后将两者都配置为接收端点作为.Instance消费者。

  class RequestProxy :
        RoutingSlipRequestProxy<Request>
    {
        protected override void BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<Request> request)
        {
        // get configs
        var settings = new Settings(_configuration);

        // Add activities
        builder.AddActivity(settings.CreateOrderActivityName, settings.CreateOrderExecuteAddress);
        builder.SetVariables(new { context.Message.OrderId, context.Message.Address, context.Message.CreatedDate, context.Message.OrderDetails });

        builder.AddActivity(settings.ReserveProductActivityName, settings.ReserveProductExecuteAddress);
        builder.SetVariables(new { context.Message.OrderDetails });
        }
    }


    class ResponseProxy :
        RoutingSlipResponseProxy<Request, Response>
    {
        protected override Response CreateResponseMessage(ConsumeContext<RoutingSlipCompleted> context, Request request)
        {
            return new Response();
        }
    }

然后你可以从消费者那里调用它,或者把排序逻辑放在代理中——无论哪个有意义,然后使用你的控制器的请求客户端来发送请求并等待响应。

于 2019-11-11T00:55:14.317 回答
0

感谢您的回复,我可以在消费者方法中调用请求和响应代理吗?现在我必须在配置公共交通总线时在启动时调用它

private static IBusControl ConfigureBus(IServiceProvider provider)
    {
        var options = provider.GetRequiredService<IOptions<AppConfig>>().Value;

        return Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(options.Host, options.VirtualHost, h =>
            {
                h.Username(options.Username);
                h.Password(options.Password);
            });

            cfg.ReceiveEndpoint(host, "process-order", e =>
            {
                e.Consumer(() => new ProcessOrderConsumer(provider.GetRequiredService<IConfiguration>()));

                var a = new ProcessOrderRequestProxy(provider.GetRequiredService<IConfiguration>());
                var b = new ResponseProxy();
                e.Instance(a);
                e.Instance(b);
            });

            var compensateCreateOrderAddress = new Uri(string.Concat("rabbitmq://localhost/", "compensate_createorder"));
            cfg.ReceiveEndpoint(host, "execute_createorder", e =>
            {
                e.ExecuteActivityHost<CreateOrderActivity, CreateOrderArguments>(compensateCreateOrderAddress, () => new
                    CreateOrderActivity(provider.GetRequiredService<IOrderRepository>(), provider.GetRequiredService<IOrderDetailRepository>()));
            });
            cfg.ReceiveEndpoint(host, "compensate_createorder", e =>
            {
                e.CompensateActivityHost<CreateOrderActivity, CreateOrderLog>(() => new
                    CreateOrderActivity(provider.GetRequiredService<IOrderRepository>(), provider.GetRequiredService<IOrderDetailRepository>()));
            });
        });
    }
于 2019-11-13T06:59:16.350 回答