4

问题:我有大量的电子邮件要发送,目前,在任何时间点队列中平均有 10 封电子邮件。我一次处理一个队列的代码;也就是说,接收消息,处理它并最终发送电子邮件。这会导致在用户注册服务时向他们发送电子邮件时出现相当大的延迟。

我已经开始考虑将代码修改为process the messages in parrallel异步说 5。我正在想象编写一个方法并使用CTP并行调用此方法,例如 5 次。

我对如何实现这一点有点迷茫。犯错的成本非常高,因为如果出现问题,用户会感到失望。

Request:在编写并行处理 Azure 服务总线中的消息的代码时,我需要帮助。谢谢。

My code in a nutshell.

Public .. Run()
{
   _myQueueClient.BeginReceive(ProcessUrgentEmails, _myQueueClient);
}

void ProcessUrgentEmails(IAsyncResult result)
{
   //casted the `result` as a QueueClient
   //Used EndReceive on an object of BrokeredMessage
   //I processed the message, then called
   sendEmail.BeginComplete(ProcessEndComplete, sendEmail);
 }


 //This method is never called despite having it as callback function above.
 void ProcessEndComplete(IAsyncResult result)
 {
     Trace.WriteLine("ENTERED ProcessEndComplete method...");
     var bm = result.AsyncState as BrokeredMessage;
     bm.EndComplete(result); 
 }
4

1 回答 1

5

本页为您提供使用 Windows Azure 服务总线时的性能提示

关于并行处理,您可以有一个线程池进行处理,每次收到消息时,您只需获取该池中的一个并为其分配一条消息。您需要管理该池。

或者,您可以一次检索多条消息并使用 TPL 处理它们......例如,BeginReceiveBatch / EndReceiveBatch方法允许您从队列(异步)中检索多个“项目”,然后使用“AsParallel”转换返回的 IEnumerable前面的方法并在多个线程中处理消息。

非常简单和 BARE BONES 样本:

var messages = await Task.Factory.FromAsync<IEnumerable<BrokeredMessage>>(Client.BeginReceiveBatch(3, null, null), Client.EndReceiveBatch);

messages.AsParallel().WithDegreeOfParallelism(3).ForAll(item =>
{
    ProcessMessage(item);
});

该代码从队列和进程中检索 3 条消息,然后在“3 个线程”中处理(注意:不保证它会使用 3 个线程,.NET 将分析系统资源,如有必要最多使用 3 个线程)

您还可以删除“ WithDegreeOfParallelism ”部分,.NET 将使用它需要的任何线程。

归根结底,有多种方法可以做到这一点,您必须决定哪一种更适合您。

更新:不使用 ASYNC/AWAIT 的示例

这是一个使用常规 Begin/End Async 模式的基本(没有错误检查)示例。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.ServiceRuntime;

namespace WorkerRoleWithSBQueue1
{
    public class WorkerRole : RoleEntryPoint
    {
        // The name of your queue
        const string QueueName = "QUEUE_NAME";
        const int MaxThreads = 3;

        // QueueClient is thread-safe. Recommended that you cache 
        // rather than recreating it on every request
        QueueClient Client;
        bool IsStopped;
        int dequeueRequests = 0;

        public override void Run()
        {
            while (!IsStopped)
            {
                // Increment Request Counter
                Interlocked.Increment(ref dequeueRequests);

                Trace.WriteLine(dequeueRequests + " request(s) in progress");

                Client.BeginReceive(new TimeSpan(0, 0, 10), ProcessUrgentEmails, Client);

                // If we have made too many requests, wait for them to finish before requesting again.
                while (dequeueRequests >= MaxThreads && !IsStopped)
                {
                    System.Diagnostics.Trace.WriteLine(dequeueRequests + " requests in progress, waiting before requesting more work");
                    Thread.Sleep(2000);
                }

            }
        }


        void ProcessUrgentEmails(IAsyncResult result)
        {
            var qc = result.AsyncState as QueueClient;
            var sendEmail = qc.EndReceive(result);
            // We have received a message or has timeout... either way we decrease our counter
            Interlocked.Decrement(ref dequeueRequests);

            // If we have a message, process it
            if (sendEmail != null)
            {
                var r = new Random();
                // Process the message
                Trace.WriteLine("Processing message: " + sendEmail.MessageId);
                System.Threading.Thread.Sleep(r.Next(10000));

                // Mark it as completed
                sendEmail.BeginComplete(ProcessEndComplete, sendEmail);
            }

        }


        void ProcessEndComplete(IAsyncResult result)
        {
            var bm = result.AsyncState as BrokeredMessage;
            bm.EndComplete(result);
            Trace.WriteLine("Completed message: " + bm.MessageId);
        }


        public override bool OnStart()
        {
            // Set the maximum number of concurrent connections 
            ServicePointManager.DefaultConnectionLimit = 12;

            // Create the queue if it does not exist already
            string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
            var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
            if (!namespaceManager.QueueExists(QueueName))
            {
                namespaceManager.CreateQueue(QueueName);
            }

            // Initialize the connection to Service Bus Queue
            Client = QueueClient.CreateFromConnectionString(connectionString, QueueName);
            IsStopped = false;
            return base.OnStart();
        }

        public override void OnStop()
        {
            // Waiting for all requestes to finish (or timeout) before closing
            while (dequeueRequests > 0)
            {
                System.Diagnostics.Trace.WriteLine(dequeueRequests + " request(s), waiting before stopping");
                Thread.Sleep(2000);
            }
            // Close the connection to Service Bus Queue
            IsStopped = true;
            Client.Close();
            base.OnStop();
        }
    }
}

希望能帮助到你。

于 2013-05-28T00:35:08.287 回答