1

我有一个 ASP.NET MVC 操作,它验证连接字符串是否可以正常工作以使用 RabbitMQ 连接到服务器。我如何做到这一点是通过简单地创建一个队列,订阅它,然后立即向它发布一条消息。我希望该消息最多在几秒钟后显示在我的订阅者身上,但事实并非如此。订阅者仅在我第一次调用该操作时收到通知(在我使用其基于 Web 的管理器从 RabbitMQ 删除队列之后),但是一旦我发布任何已创建队列的后续消息,则无法调用订阅者。请看一下我的代码,如果您看到我没有看到的东西,请告诉我。提前致谢。

    //This is just the action called on a POST request.
    //Here is where the test is done.
    [HttpPost]
    public void DoConnectionVerificationAsync()
    {
        const string paramName = "queueSuccessful";
        try
        {
            //This is my routing key
            const string routingKey = "management.verify";

            //Here I declare the key and the exchange and bind them.
            var queue = Queue.DeclareDurable(routingKey);
            var exchange = Exchange.DeclareDirect("Orchard");
            queue.BindTo(exchange, routingKey);

            //Here I just generate a random int to send in the test message to the queue
            Random random = new Random();
            int randomInt = random.Next();
            //Instantiate the actual message with the random integer.
            var message = new Message<VerifyMessage>(new VerifyMessage
            {
                Content = randomInt.ToString(CultureInfo.InvariantCulture)
            });
            message.Properties.AppId = "CRM";

            //Because this is an asynchronous action, here I hint the beginning of the async operation.
            AsyncManager.OutstandingOperations.Increment();

            //Here I have my subscriber. The subscriber gets called only the first time, when the queue hasn't been created yet. That's a problem, it should be called every time I publish, which is in the following lines of code.
            _bus.Subscribe<VerifyMessage>(queue, (response, messageReceivedInfo) => Task.Factory.StartNew(() =>
            {
                VerifyMessage receivedMessage = response.Body;
                string content = receivedMessage.Content;
                int integer = int.Parse(content);
                //I expect the int received from the queue is the same as the one I sent
                bool success = integer == randomInt;
                AsyncManager.Parameters[paramName] = success;
                AsyncManager.OutstandingOperations.Decrement();
            }));

            //And here I publish the message. This always works, I can see the message stored in the queue using the web based RabbitMQ Manager
            //The problem is that the message gets stuck in the queue and never gets sent to the subscriber defined above
            using (var publishChannel = _bus.OpenPublishChannel(x => x.WithPublisherConfirms()))
            {
                publishChannel.Publish(exchange, routingKey, message, t =>
                    t.OnSuccess(() =>
                        {
                            // If we successfully publish, then there's nothing we really need to do. So this function stays empty.
                        })
                        .OnFailure(() =>
                            {
                                AsyncManager.Parameters[paramName] = false; AsyncManager.OutstandingOperations.Decrement();
                            }));
            }
        }
        catch (EasyNetQException)
        {
            AsyncManager.Parameters[paramName] = false;
            AsyncManager.OutstandingOperations.Decrement();
        }
    }

    //These functions down here don't really matter, but I'm including them just in case so that you can see it all.
    public ActionResult DoConnectionVerificationCompleted(bool queueSuccessful)
    {
        return RedirectToAction("VerifyQueueConnectionResult", new { queueSuccessful });
    }

    public ActionResult VerifyQueueConnectionResult(bool queueSuccessful)
    {
        VerifyQueueConnectionResultModel model = new VerifyQueueConnectionResultModel();
        model.Succeded = queueSuccessful;
        return View(model);
    }
4

3 回答 3

2

每次调用 bus.Subscribe 都会创建一个新的消费者。因此,它第一次工作时,您的队列中只有一个消费者,当您向其发布消息时,它会被路由到该消费者。

第二次调用 bus.Subscribe 时,第二个消费者绑定到同一个队列。当 RabbitMQ 在单个队列上有多个消费者时,它会循环向消费者发送消息。这是一个特点。这就是开箱即用的工作共享方式。您的消息被路由到第一个消费者,而不是您刚刚声明的消费者。第 3、4 等消费者也是如此,因此消息似乎尚未到达。

于 2013-04-30T15:47:20.420 回答
0

您可以为每个不同的订阅者使用不同的队列,如下所示:

bus.Subscribe<MyMessage>("my_subscription_1"...
bus.Subscribe<MyMessage>("my_subscription_2"...

这样,您发布的 MyMessage 类型的任何消息都将到达所有这些队列。

于 2013-05-22T12:09:15.463 回答
0

好吧,我找到了解决问题的简单方法。我最终搜索了 EasyNetQ API 中的可用方法并找到了Queue.SetAsSingleUse(). 我只是在将队列绑定到交换器后调用了该方法,并且几乎做到了。不知道是做什么queue.SetAsSingleUse()的,但从它的名字来看,它听起来像是在第一条消息发布并传递给订阅者之后处理队列。在我的情况下,每次测试只使用一次队列是有意义的,但是可能出于任何原因想要保留队列的人可能会遇到麻烦。

于 2013-04-29T20:07:02.067 回答