10

我正在尝试了解 MassTransit 和 RabbitMQ 并排队(第 1 天)

我的问题是“是否需要消费者才能在 MT 中排队工作。我问的原因是因为我首先创建了域和生产者,但我在 RabbitMQ 管理窗口中没有看到任何排队的项目。”

创建消费者队列后,我可以看到消息正在排队。

根据我的理解,Producer 从不知道消费者,那为什么 MassTransit 需要消费者队列来启动消息发布呢?

制片人

using MassTransit;

namespace Producer
{
    class Program
    {
        static void Main(string[] args)
        {
            Bus.Initialize(sbc =>
            {
                sbc.UseRabbitMq();         //1
                sbc.UseControlBus();
                sbc.EnableMessageTracing();
                sbc.EnableRemoteIntrospection();
                sbc.ReceiveFrom("rabbitmq://localhost/MT.Producer");
                sbc.UseControlBus();
            });

            Bus.Instance.Publish(new NewOrderMessage { OrderName = "Hello World" });
        }
    }
}

应用程序

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using MassTransit;
using Topshelf;

namespace Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            Bus.Initialize(sbc =>
            {
                sbc.UseRabbitMq();
                sbc.UseRabbitMqRouting();
                sbc.ReceiveFrom("rabbitmq://localhost/MT.ConsumerService");
            });

            var cfg = HostFactory.New(c =>
            {
                c.SetServiceName("MT.ConsumerService");
                c.SetDisplayName("MT.ConsumerService");
                c.SetDescription("MT.ConsumerService");

                //c.BeforeStartingServices(s => {});

                c.Service<ConsumerService>(a =>
                {
                    a.ConstructUsing(service => new ConsumerService());
                    a.WhenStarted(o => o.Start());
                    a.WhenStopped(o => o.Stop());
                });

            });

            try
            {
                cfg.Run();
            }
            catch (Exception e)
            {
                Console.WriteLine(e.Message);
                throw;
            }
        }
    }
}

讯息

namespace Domain
{
    public class NewOrderMessage
    {
        public NewOrderMessage()
        {
            OrderId = Guid.NewGuid();
        }
        public Guid OrderId { get; set; }
        public string OrderName { get; set; }
    }
}

消费者服务

namespace Consumer
{
    class ConsumerService
    {
        readonly IServiceBus _bus;

        public ConsumerService()
        {
            _bus = Bus.Instance;
        }

        public void Start()
        {
            _bus.SubscribeHandler<NewOrderMessage>(CreateOrder);

            Console.WriteLine("Starting....");
        }

        public void Stop()
        {    
            Console.WriteLine("Stopping....");
        }

        public void CreateOrder(NewOrderMessage command)
        {
            Console.WriteLine("Creating Order: {0}  with Id: {1}", command.OrderName, command.OrderId);
        }
    }
}

代码是使用网络上的示例创建的。

编辑还想补充一点,所有命名空间都是不同的项目域生产者消费者

问候,

三月

4

1 回答 1

8

以下关于masstransit-discus的答案确实对我有所帮助。

来自 Google Group masstransit-discuss

... MassTransit 的问题是,您实际上并没有发布到队列,而是发布到交换器,然后设置该交换器将该消息传递到已订阅该交换器消息的其他队列。由于您没有任何消费者,因此没有人对您的消息表示任何兴趣,因此它将被忽略。

所以只需设置一个消费者并让它听“rabbitmq://localhost/B”。第一次运行它时,它将在它们之间创建必要的交换和链接,并且您的消息将被传递到名为 B 的队列。

安德斯

总线是所有交换、队列和服务的集合。当您在总线上发布时,在该总线上注册的所有消费者都会收到它。

交换是实现这项工作的 RabbitMQ 实现。

特拉维斯

于 2013-03-18T22:20:48.873 回答