19

我有一个负责检查 4 个服务总线队列的 Azure 辅助角色。目前,我只是手动检查队列的循环方法。

while(true)
{
    //loop through my queues to check for messages
}

Azure SDK 2.0 提供了侦听消息而不是轮询消息的能力。但是我看到的每个示例都使用带有 Console.ReadKey() 的控制台应用程序。有没有办法让工人角色也坐下来等待消息?

我试过了:

public override void Run()
{
    _queueProcessors.ForEach(x => x.OnMessage(Process);
}

其中 _queueProcessors 是 QueueClients 的列表,而 Process 是处理消息的私有方法。但是,工作者角色会注册它们然后重新启动。

那么有人知道如何让队列客户端坐下来等待消息吗?

4

1 回答 1

39

以下是一个代码示例:

using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure.ServiceRuntime;
using System.Diagnostics;
using System.Net;
using System.Threading;

namespace WorkerRoleWithSBQueue1
{
    public class WorkerRole : RoleEntryPoint
    {
        // The name of your queue
        const string QueueName = "demoapp";
        ManualResetEvent CompletedEvent = new ManualResetEvent(false);

        // QueueClient is thread-safe. Recommended that you cache 
        // rather than recreating it on every request
        QueueClient Client;

        public override void Run()
        {
            OnMessageOptions options = new OnMessageOptions();
            options.AutoComplete = true; // Indicates if the message-pump should call complete on messages after the callback has completed processing.
            options.MaxConcurrentCalls = 1; // Indicates the maximum number of concurrent calls to the callback the pump should initiate 
            options.ExceptionReceived += LogErrors; // Allows users to get notified of any errors encountered by the message pump

            Trace.WriteLine("Starting processing of messages");
            // Start receiveing messages
            Client.OnMessage((receivedMessage) => // Initiates the message pump and callback is invoked for each message that is recieved, calling close on the client will stop the pump.
                {
                    try
                    {
                        // Process the message
                        Trace.WriteLine("Processing Service Bus message: " + receivedMessage.SequenceNumber.ToString());
                    }
                    catch
                    {
                        // Handle any message processing specific exceptions here
                    }
                }, options);

            CompletedEvent.WaitOne();
        }

        private void LogErrors(object sender, ExceptionReceivedEventArgs e)
        {
            if (e.Exception != null)
            {
                Trace.WriteLine("Error: " + e.Exception.Message);
            }
        }

        public override bool OnStart()
        {
            // Set the maximum number of concurrent connections 
            ServicePointManager.DefaultConnectionLimit = 12;

            // Create the queue if it does not exist already
            Trace.WriteLine("Creating Queue");
            string connectionString = "*** provide your connection string here***";
            var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
            if (!namespaceManager.QueueExists(QueueName))
            {
                namespaceManager.CreateQueue(QueueName);
            }

            // Initialize the connection to Service Bus Queue
            Client = QueueClient.CreateFromConnectionString(connectionString, QueueName);
        
            Trace.WriteLine("Sending messages...");
            // populate some messages
            for (int ctr = 0; ctr < 10; ctr++)
            {
                Client.Send(new BrokeredMessage());
            }
        
            return base.OnStart();
        }

        public override void OnStop()
        {
            // Close the connection to Service Bus Queue
            Client.Close();
            CompletedEvent.Set(); // complete the Run function
            base.OnStop();
        }
    }
}
于 2013-05-24T16:32:39.953 回答