2

使用 MassTransit 和 RabbitMQ 处理请求/响应场景。当做一个简单的请求/回复时,它会工作多次。如果我在请求处理程序中发布一条消息,它适用于第一个请求,但请求处理程序永远不会在第二个请求上被调用并最终超时并且消息保留在服务器队列中。

好像我错过了什么;配置可能?

该项目位于https://bitbucket.org/joeyoung/enterprise-rabbitmq

客户端配置:

ObjectFactory.Configure(cfg =>
{
    cfg.AddRegistry<WebRegistry>();
    cfg.For<IServiceBus>().Singleton().Use(o => ServiceBusFactory.New(sbc =>
    {
        // configure the bus
        sbc.UseRabbitMqRouting();
        sbc.ReceiveFrom("rabbitmq://localhost/entrprise_client");

        // finds all the consumers in the container and register them with the bus
        sbc.Subscribe(x => x.LoadFrom(ObjectFactory.Container));
    }));
});

服务器配置:

var container = new Container(cfg =>
{
    cfg.Scan(scan =>
    {
        scan.Assembly("Server.MessageHandlers");
        scan.AddAllTypesOf<IConsumer>();
    });
});

var bus = ServiceBusFactory.New(sbc =>
{
    // configure the bus
    sbc.UseRabbitMqRouting();
    sbc.ReceiveFrom("rabbitmq://localhost/enterprise_server");

    // finds all the consumers in the container and register them with the bus
    sbc.Subscribe(x => x.LoadFrom(container));
});

// finally inject the bus into the container
container.Inject(bus);

发送请求:

bus.PublishRequest(new CreateProductCommand(correlationId, model.Name, model.Description, model.Price), x =>
{
    x.HandleTimeout(10.Seconds(), () => { timedOut = true; });
    x.Handle<CreateProductCommand.Response>(response => { productId = response.Id; });
});

消费请求:

public void Consume(IConsumeContext<CreateProductCommand> context)
{
    Console.Out.WriteLine("Consuming Create Product");

    // simulate creating a product
    var productId = "products/1234";

    bus.Publish(new ProductCreatedEvent(productId));

    context.Respond(new CreateProductCommand.Response(context.Message.CorrelationId) { Id = productId});
}

讯息:

public class CreateProductCommand : CorrelatedBy<Guid>
{
    public Guid CorrelationId { get; private set; }
    public string Name { get; set; }
    public string Description { get; set; }
    public decimal Price { get; set; }

    public CreateProductCommand(Guid correlationId, string name, string description, decimal price) 
    {
        CorrelationId = correlationId;
        Name = name;
        Description = description;
        Price = price;
    }

    public class Response : CorrelatedBy<Guid>
    {
        public Guid CorrelationId { get; private set; }

        public string Id { get; set; }

        public Response(Guid correlationId)
        {
            CorrelationId = correlationId;
        }
    }
}
4

3 回答 3

1

感谢 Chris 建议在 IConsumeContext 上使用 Bus。这似乎已经解决了它。

因此,不是在处理程序的构造函数中注入 IServiceBus,而是从上下文中获取总线。

public class CreateProductCommandHandler : Consumes<CreateProductCommand>.Context
{
    public void Consume(IConsumeContext<CreateProductCommand> context)
    {
        Console.Out.WriteLine("Consuming Create Product");

        // simulate creating a product
        var productId = "products/1234";

        context.Bus.Publish(new ProductCreatedEvent(productId));

        context.Respond(new CreateProductCommand.Response(context.Message.CorrelationId) { Id = productId});
    }
}
于 2012-05-23T13:54:26.547 回答
0

您好,我遇到了同样的问题,但这是由 2 个微服务共享相同的响应队列引起的。我只是确保他们有唯一的名字,而且效果很好!

于 2021-11-03T20:22:33.583 回答
-4

我知道你想用 MT 和 RabbitMQ,我放弃了,把我的爱转向了 EasyNetQ,试试看(-:

于 2012-05-22T18:59:54.873 回答