下面我有一个使用 Azure 服务总线队列的相当简单的 .NET 控制台应用程序。
正如您将看到的,我正在使用 Task.Factory 启动 25 个接收器任务,然后调用我的 APM 样式的 BeginMessageReceive 方法。然后,在 EndMessageReceive 结束时,我再次调用 BeginMessageReceive 以保持循环继续进行。
我的问题是我如何才能实现同样的事情,但使用递归任务并可能利用 C# 5.0 async/await 从 APM 样式的 BeginMessageReceive/EndMessageReceive 切换到 TPL/TAP 方法?
using System;
using System.Configuration;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;
namespace ServiceBusConsumer
{
class Program
{
private static QueueClient _queueClient;
private static void Main(string[] args)
{
var connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
_queueClient = QueueClient.CreateFromConnectionString(connectionString, "MyQueue");
for (var i = 0; i < 25; i++ )
{
Task.Factory.StartNew(BeginMessageReceive);
}
Console.WriteLine("Waiting for messages...");
Console.ReadKey();
_queueClient.Close();
} //end private static void Main(string[] args)
private static void BeginMessageReceive()
{
_queueClient.BeginReceive(TimeSpan.FromMinutes(5), EndMessageReceive, null);
}
private static void EndMessageReceive(IAsyncResult iar)
{
var message = _queueClient.EndReceive(iar);
try
{
if (message != null)
{
var msg = message.GetBody<string>();
Console.WriteLine("Message: " + msg);
if (_queueClient.Mode == ReceiveMode.PeekLock)
{
// Mark brokered message as completed at which point it's removed from the queue.
message.Complete();
}
}
}
catch (Exception ex)
{
if (_queueClient.Mode == ReceiveMode.PeekLock)
{
// unlock the message and make it available
message.Abandon();
}
Console.WriteLine("Exception was thrown: " + ex.GetBaseException().Message);
}
finally
{
if (message != null)
{
message.Dispose();
}
}
BeginMessageReceive();
}
}
}
如果 MessageReceive 超时过期,用于递归调用自身的新修改代码:
private static async Task MessageReceiveAsync()
{
while (true)
{
using (var message = await _queueClient.ReceiveAsync(TimeSpan.FromMinutes(5)))try
{
if (message != null)
{
try
{
var msg = message.GetBody<string>();
Console.WriteLine("Message: " + msg);
if (_queueClient.Mode == ReceiveMode.PeekLock)
{
// Mark brokered message as completed at which point it's removed from the queue.
await message.CompleteAsync();
}
}
catch (Exception ex)
{
message.AbandonAsync();
Console.WriteLine("Exception was thrown: " + ex.GetBaseException().Message);
}
}
}
}
}