7

我想创建一个使用 MSMQ 绑定的 WCF 服务,因为我有大量的通知要处理。重要的是客户端不会被服务拖住,并且通知按照它们被引发的顺序进行处理,因此是队列实现。

另一个考虑因素是弹性。我知道我可以集群 M​​SMQ 本身以使队列更加健壮,但我希望能够在不同的服务器上运行我的服务实例,因此如果服务器崩溃通知不会在队列中建立但另一台服务器继续处理.

我已经对 MSMQ 绑定进行了试验,发现您可以让多个服务实例在同一个队列上进行侦听,并且它们最终会进行一种循环,负载分布在可用服务中。这很好,但我最终会丢失队列的顺序,因为不同的实例需要不同的时间来处理请求。

我一直在使用一个简单的控制台应用程序进行实验,这是下面的史诗代码转储。当它运行时,我得到如下输出:

host1 open
host2 open
S1: 01
S1: 03
S1: 05
S2: 02
S1: 06
S1: 08
S1: 09
S2: 04
S1: 10
host1 closed
S2: 07
host2 closed

我想要发生的是:

host1 open
host2 open
S1: 01
<pause while S2 completes>
S2: 02
S1: 03
<pause while S2 completes>
S2: 04
S1: 05
S1: 06
etc.

我会认为由于 S2 尚未完成,它可能仍然会失败并将正在处理的消息返回到队列中。因此,不应允许 S1 从队列中拉出另一条消息。我的队列是事务性的,我尝试设置TransactionScopeRequired = true服务但无济于事。

这甚至可能吗?我是不是走错了路?在没有某种中央同步机制的情况下,是否有其他方法可以构建故障转移服务?

class WcfMsmqProgram
{
    private const string QueueName = "testq1";

    static void Main()
    {
        // Create a transactional queue
        string qPath = ".\\private$\\" + QueueName;
        if (!MessageQueue.Exists(qPath))
            MessageQueue.Create(qPath, true);
        else
            new MessageQueue(qPath).Purge();

        // S1 processes as fast as it can
        IService s1 = new ServiceImpl("S1");
        // S2 is slow
        IService s2 = new ServiceImpl("S2", 2000);

        // MSMQ binding
        NetMsmqBinding binding = new NetMsmqBinding(NetMsmqSecurityMode.None);

        // Host S1
        ServiceHost host1 = new ServiceHost(s1, new Uri("net.msmq://localhost/private"));
        ConfigureService(host1, binding);
        host1.Open();
        Console.WriteLine("host1 open");

        // Host S2
        ServiceHost host2 = new ServiceHost(s2, new Uri("net.msmq://localhost/private"));
        ConfigureService(host2, binding);
        host2.Open();
        Console.WriteLine("host2 open");

        // Create a client 
        ChannelFactory<IService> factory = new ChannelFactory<IService>(binding, new EndpointAddress("net.msmq://localhost/private/" + QueueName));
        IService client = factory.CreateChannel();

        // Periodically call the service with a new number
        int counter = 1;
        using (Timer t = new Timer(o => client.EchoNumber(counter++), null, 0, 500))
        {
            // Enter to stop
            Console.ReadLine();
        }

        host1.Close();
        Console.WriteLine("host1 closed");
        host2.Close();
        Console.WriteLine("host2 closed");

        // Wait for exit
        Console.ReadLine();
    }

    static void ConfigureService(ServiceHost host, NetMsmqBinding binding)
    {
        var endpoint = host.AddServiceEndpoint(typeof(IService), binding, QueueName);
    }

    [ServiceContract]
    interface IService
    {
        [OperationContract(IsOneWay = true)]
        void EchoNumber(int number);
    }

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
    class ServiceImpl : IService
    {
        public ServiceImpl(string name, int sleep = 0)
        {
            this.name = name;
            this.sleep = sleep;
        }

        private string name;
        private int sleep;

        public void EchoNumber(int number)
        {
            Thread.Sleep(this.sleep);
            Console.WriteLine("{0}: {1:00}", this.name, number);
        }
    }
}
4

2 回答 2

10

巴特瓦德,

您正在尝试手动创建服务总线。你为什么不尝试使用现有的?

NServiceBus、MassTransit、ServiceStack

其中至少有 2 个与 MSMQ 一起工作。

此外,如果您绝对需要订购它实际上可能是出于另一个原因 - 您希望能够发送消息并且您不希望在第一条消息之前处理相关消息。您正在寻找 Saga 模式。NServiceBus 和 MassTransit 都可以让你轻松管理 Sagas,它们都可以让你简单地触发初始消息,然后根据条件触发剩余的消息。它将允许您快速实现分布式应用程序的填充。

然后,您甚至可以扩展到数千个客户端、队列服务器和消息处理器,而无需编写任何代码或任何问题。

我们试图在这里通过 msmq 实现我们自己的服务总线,但我们放弃了,因为另一个问题不断出现。我们选择了 NServiceBus,但 MassTransit 也是一款出色的产品(它是 100% 开源的,而 NServiceBus 不是)。ServiceStack 在制作 API 和使用消息队列方面非常出色——我相信您可以使用它在几分钟内制作充当队列前端的服务。

哦,我有没有提到 NSB 和 MT 都只需要不到 10 行代码就可以完全实现队列、发送者和处理程序?

- - - 添加 - - -

Udi Dahan(NServiceBus 的主要贡献者之一)在: Udi Dahan 的“按顺序消息传递神话” “消息排序:是否具有成本效益?”中谈到了这一点。与乌迪·达汉

Chris Patterson(Mass Transit 的主要贡献者之一) “使用 Sagas 确保正确的消息顺序”问题

StackOverflow 问题/答案: “在 WCF 应用程序中使用 MSMQ 消息时保留消息顺序”

- - - 问题 - - -

我必须说我很困惑为什么你需要保证消息顺序——如果你使用的是 HTTP/SOAP 协议,你会处于同样的位置吗?我的猜测是否定的,那为什么在 MSMQ 中会出现问题?

祝你好运,希望这会有帮助,

于 2012-12-13T18:29:59.007 回答
1

确保消息的有序传递是大容量消息传递的实际棘手问题之一。

在理想情况下,您的消息目的地应该能够处理乱序消息。这可以通过确保您的消息源包含某种排序信息来实现。再次理想情况下,这采用某种形式的 x-of-n 批戳(消息 1 of 10、2 of 10 等)。然后,您的消息目的地需要在数据交付后将数据组装成顺序。

然而,在现实世界中,通常没有改变下游系统来处理乱序到达的消息的余地。在这种情况下,您有两个选择:

  1. 完全使用单线程 - 实际上您通常可以找到某种“分组 ID”,这意味着您可以在每个组的意义上使用单线程,这意味着您仍然可以跨不同的消息组进行并发。
  2. 在您希望按顺序接收消息的每个消费者系统周围实施一个重新排序器包装器。

这两种解决方案都不是很好,但这是我认为您可以进行并发和按顺序消息传递的唯一方法。

于 2012-12-10T14:25:57.880 回答