4

如果您使用相同的端点名称创建发布者和消费者,我们遇到了 MassTransit 丢失消息的情况。

注意下面的代码;如果我为消费者或发布者使用不同的端点名称(例如,发布者为“rabbitmq://localhost/mtlossPublised”),那么消息将同时计算发布和消费匹配;如果我使用相同的端点名称(如示例中),那么我收到的消息比发布的消息少。

这是预期的行为吗?还是我做错了什么,下面的工作示例代码。

using MassTransit;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MTMessageLoss
{
    class Program
    {
        static void Main(string[] args)
        {
            var consumerBus = ServiceBusFactory.New(b =>
            {
                b.UseRabbitMq();
                b.UseRabbitMqRouting();
                b.ReceiveFrom("rabbitmq://localhost/mtloss");
            });
            var publisherBus = ServiceBusFactory.New(b =>
            {
                b.UseRabbitMq();
                b.UseRabbitMqRouting();
                b.ReceiveFrom("rabbitmq://localhost/mtloss");
            });
            consumerBus.SubscribeConsumer(() => new MessageConsumer());
            for (int i = 0; i < 10; i++)
                publisherBus.Publish(new SimpleMessage() { CorrelationId = Guid.NewGuid(), Message = string.Format("This is message {0}", i) });
            Console.WriteLine("Press ENTER Key to see how many you consumed");
            Console.ReadLine();
            Console.WriteLine("We consumed {0} simple messages. Press Enter to terminate the applicaion.", MessageConsumer.Count);
            Console.ReadLine();
            consumerBus.Dispose();
            publisherBus.Dispose();
        }
    }
    public interface ISimpleMessage : CorrelatedBy<Guid>
    {
        string Message { get; }
    }
    public class SimpleMessage : ISimpleMessage
    {
        public Guid CorrelationId { get; set; }
        public string Message { get; set; }
    }
    public class MessageConsumer : Consumes<ISimpleMessage>.All
    {
        public static int Count = 0;
        public void Consume(ISimpleMessage message)
        {
            System.Threading.Interlocked.Increment(ref Count);
        }
    }
}
4

2 回答 2

4

最重要的是,总线的每个实例都需要它自己的队列来读取。即使总线只是为了发布消息而存在。这只是 MassTransit 工作方式的一个要求。

http://masstransit.readthedocs.org/en/master/configuration/config_api.html#basic-options - 查看警告。

当两个总线实例共享同一个队列时,我们将行为保留为未定义。无论如何,这不是我们支持的条件。每个总线实例都可以将元数据发送到其他总线实例,并且需要它自己的端点。这对 MSMQ 来说是一个更大的交易,所以也许我们可以让这个案例在 RabbitMQ 上工作——但这​​不是我们在这一点上花太多心思的事情。

于 2012-09-17T23:24:46.750 回答
1

发生的情况是,在提供相同的 Receiver Uri 时,您是在告诉 MT 在两条总线上对消耗进行负载平衡,但是您只有一条总线在收听消息。

如果你让它跟踪收到了哪些消息,你会看到它(几乎)每秒都有一个。

调整了您的示例代码后,我得到了

We consumed 6 simple messages. Press Enter to terminate the applicaion.
Received 0
Received 3
Received 5
Received 6
Received 7
Received 8

在另一辆公共汽车上启动消费者,您将获得所有消费者

We consumed 10 simple messages. Press Enter to terminate the applicaion.
Received 0
Received 1
Received 2
Received 3
Received 4
Received 5
Received 6
Received 7
Received 8
Received 9

所以是的,我会说这是预期的行为。

这是两个订阅者的调整示例代码

using MassTransit;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MTMessageLoss
{
    class Program
    {
        internal static bool[] msgReceived = new bool[10];
        static void Main(string[] args)
        {
            var consumerBus = ServiceBusFactory.New(b =>
                {
                    b.UseRabbitMq();
                    b.UseRabbitMqRouting();
                    b.ReceiveFrom("rabbitmq://localhost/mtloss");
                });
            var publisherBus = ServiceBusFactory.New(b =>
                {
                    b.UseRabbitMq();
                    b.UseRabbitMqRouting();
                    b.ReceiveFrom("rabbitmq://localhost/mtloss");
                });
            publisherBus.SubscribeConsumer(() => new MessageConsumer());
            consumerBus.SubscribeConsumer(() => new MessageConsumer());
            for (int i = 0; i < 10; i++)
                consumerBus.Publish(new SimpleMessage()
                    {CorrelationId = Guid.NewGuid(), MsgId = i});
            Console.WriteLine("Press ENTER Key to see how many you consumed");
            Console.ReadLine();
            Console.WriteLine("We consumed {0} simple messages. Press Enter to terminate the applicaion.",
                              MessageConsumer.Count);
            for (int i = 0; i < 10; i++)
                if (msgReceived[i])
                    Console.WriteLine("Received {0}", i);
            Console.ReadLine();
            consumerBus.Dispose();
            publisherBus.Dispose();

        }
    }
    public interface ISimpleMessage : CorrelatedBy<Guid>
    {
        int MsgId { get; }
    }
    public class SimpleMessage : ISimpleMessage
    {
        public Guid CorrelationId { get; set; }
        public int MsgId { get; set; }
    }
    public class MessageConsumer : Consumes<ISimpleMessage>.All
    {
        public static int Count = 0;
        public void Consume(ISimpleMessage message)
        {
            Program.msgReceived[message.MsgId] = true;
            System.Threading.Interlocked.Increment(ref Count);
        }
    }
}
于 2012-09-17T12:05:50.720 回答