我是 MassTransit 的新手,在我的理解中我错过了一些东西。
假设我有一个服务器场,所有节点都可以做同样的工作。应用程序框架是 CQRS 的风格。这意味着我有两种基本的消息要发布:
- 命令:必须由其中一个服务器处理,其中任何一个(第一个没有工作槽的)
- 事件:必须由所有服务器处理
我已经构建了一个非常简单的 MassTransit 原型(一个每隔 X 秒发送一次问候的控制台应用程序)。
在 API 中,我可以看到有一个“发布”方法。我如何指定它是什么类型的消息(一个与所有服务器)?
如果我查看“处理程序”配置,我可以指定队列 uri。如果我为所有主机指定相同的队列,所有主机都会收到消息,但我不能将执行限制在一台服务器上。
如果我从主机专用队列中侦听,则只有一个服务器会处理消息,但我不知道如何广播另一种消息。
请帮助我了解我所缺少的。
PS:如果它在乎,我的消息系统是rabbitmq。
为了测试,我用这些类创建了一个通用类库:
public static class ActualProgram
{
private static readonly CancellationTokenSource g_Shutdown = new CancellationTokenSource();
private static readonly Random g_Random = new Random();
public static void ActualMain(int delay, int instanceName)
{
Thread.Sleep(delay);
SetupBus(instanceName);
Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token);
Console.WriteLine("Press enter at any time to exit");
Console.ReadLine();
g_Shutdown.Cancel();
Bus.Shutdown();
}
private static void PublishRandomMessage()
{
Bus.Instance.Publish(new Message
{
Id = g_Random.Next(),
Body = "Some message",
Sender = Assembly.GetEntryAssembly().GetName().Name
});
if (!g_Shutdown.IsCancellationRequested)
{
Thread.Sleep(g_Random.Next(500, 10000));
Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token);
}
}
private static void SetupBus(int instanceName)
{
Bus.Initialize(sbc =>
{
sbc.UseRabbitMqRouting();
sbc.ReceiveFrom("rabbitmq://localhost/simple" + instanceName);
sbc.Subscribe(subs =>
{
subs.Handler<Message>(MessageHandled);
});
});
}
private static void MessageHandled(Message msg)
{
ConsoleColor color = ConsoleColor.Red;
switch (msg.Sender)
{
case "test_app1":
color = ConsoleColor.Green;
break;
case "test_app2":
color = ConsoleColor.Blue;
break;
case "test_app3":
color = ConsoleColor.Yellow;
break;
}
Console.ForegroundColor = color;
Console.WriteLine(msg.ToString());
Console.ResetColor();
}
private static void MessageConsumed(Message msg)
{
Console.WriteLine(msg.ToString());
}
}
public class Message
{
public long Id { get; set; }
public string Sender { get; set; }
public string Body { get; set; }
public override string ToString()
{
return string.Format("[{0}] {1} : {2}" + Environment.NewLine, Id, Sender, Body);
}
}
我还有 3 个只运行 ActualMain 方法的控制台应用程序:
internal class Program
{
private static void Main(string[] args)
{
ActualProgram.ActualMain(0, 1);
}
}