12

我是 MassTransit 的新手,在我的理解中我错过了一些东西。

假设我有一个服务器场,所有节点都可以做同样的工作。应用程序框架是 CQRS 的风格。这意味着我有两种基本的消息要发布:

  • 命令:必须由其中一个服务器处理,其中任何一个(第一个没有工作槽的)
  • 事件:必须由所有服务器处理

我已经构建了一个非常简单的 MassTransit 原型(一个每隔 X 秒发送一次问候的控制台应用程序)。

在 API 中,我可以看到有一个“发布”方法。我如何指定它是什么类型的消息(一个与所有服务器)?

如果我查看“处理程序”配置,我可以指定队列 uri。如果我为所有主机指定相同的队列,所有主机都会收到消息,但我不能将执行限制在一台服务器上。

如果我从主机专用队列中侦听,则只有一个服务器会处理消息,但我不知道如何广播另一种消息。

请帮助我了解我所缺少的。

PS:如果它在乎,我的消息系统是rabbitmq。

为了测试,我用这些类创建了一个通用类库:

public static class ActualProgram
{
    private static readonly CancellationTokenSource g_Shutdown = new CancellationTokenSource();

    private static readonly Random g_Random = new Random();

    public static void ActualMain(int delay, int instanceName)
    {
        Thread.Sleep(delay);
        SetupBus(instanceName);

        Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token);

        Console.WriteLine("Press enter at any time to exit");
        Console.ReadLine();
        g_Shutdown.Cancel();

        Bus.Shutdown();
    }

    private static void PublishRandomMessage()
    {
        Bus.Instance.Publish(new Message
        {
            Id = g_Random.Next(),
            Body = "Some message",
            Sender = Assembly.GetEntryAssembly().GetName().Name
        });

        if (!g_Shutdown.IsCancellationRequested)
        {
            Thread.Sleep(g_Random.Next(500, 10000));
            Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token);
        }
    }

    private static void SetupBus(int instanceName)
    {
        Bus.Initialize(sbc =>
        {
            sbc.UseRabbitMqRouting();
            sbc.ReceiveFrom("rabbitmq://localhost/simple" + instanceName);
            sbc.Subscribe(subs =>
            {
                subs.Handler<Message>(MessageHandled);
            });
        });
    }

    private static void MessageHandled(Message msg)
    {
        ConsoleColor color = ConsoleColor.Red;
        switch (msg.Sender)
        {
            case "test_app1":
                color = ConsoleColor.Green;
                break;

            case "test_app2":
                color = ConsoleColor.Blue;
                break;

            case "test_app3":
                color = ConsoleColor.Yellow;
                break;
        }
        Console.ForegroundColor = color;
        Console.WriteLine(msg.ToString());
        Console.ResetColor();
    }

    private static void MessageConsumed(Message msg)
    {
        Console.WriteLine(msg.ToString());
    }
}

public class Message
{
    public long Id { get; set; }

    public string Sender { get; set; }

    public string Body { get; set; }

    public override string ToString()
    {
        return string.Format("[{0}] {1} : {2}" + Environment.NewLine, Id, Sender, Body);
    }
}

我还有 3 个只运行 ActualMain 方法的控制台应用程序:

internal class Program
{
    private static void Main(string[] args)
    {
        ActualProgram.ActualMain(0, 1);
    }
}
4

1 回答 1

10

您想要的是竞争消费者(搜索 SO,您会发现更多信息)使用 RabbitMQ 让生活变得轻松,您需要做的就是为您启动的每个消费者指定相同的队列名称,消息将仅由处理其中之一。而不是每次都生成一个唯一的队列。

private static void SetupBus(int instanceName)
{
    Bus.Initialize(sbc =>
    {
        sbc.UseRabbitMqRouting();
        sbc.ReceiveFrom("rabbitmq://localhost/Commands);
        sbc.Subscribe(subs =>
        {
            subs.Handler<Message>(MessageHandled);
        });
    });
}

AFAIK,你需要有一个单独的命令处理程序而不是事件处理程序。所有命令处理程序都将ReceiveFrom在同一个队列中,所有事件处理程序都将ReceiveFrom拥有自己唯一的队列。

另一个难题是如何将消息发送到总线。您仍然可以将发布用于命令,但如果您错误地配置了消费者,您可能会获得多次执行,因为消息将发送给所有消费者,如果您想保证消息最终在一个队列中,您可以使用Send而不是Publish.

     Bus.Instance
         .GetEndpoint(new Uri("rabbitmq://localhost/Commands"))
        .Send(new Message
        {
            Id = g_Random.Next(),
            Body = "Some message",
            Sender = Assembly.GetEntryAssembly().GetName().Name
        });
于 2012-08-10T12:01:17.077 回答