这是通过调用一个函数来解决的,该函数使用多线程在指定的时间间隔后轮询天蓝色队列并获取消息(可能具有设置的指数回退时间)。
方法 1:
在 webapp 中实现这一点有点棘手,我不得不使用 hack - 从构造函数中调用一个函数来开始轮询。
在 startup.cs(在配置函数中),注册你的服务,
app.ApplicationServices.GetService<IQueueConsumer>();
在 ConfigureServices 函数中,配置和创建轮询队列类的对象,
services.TryAddTransient<IQueueConsumer>(sp => this.GetQueueProcessor(sp));
然后,当调用构造函数来创建对象时,开始轮询另一个线程中的队列。
public QueuePollingFunction(
IOptions<QueueOptions> queueOptions,
CloudQueue queue)
{
this.isEnabled = queueOptions.Value.IsEnabled;
this.StartPollingQueue(queue);
}
public override async Task<bool> ProcessMessageAsync(string message)
{
bool result = false;
try
{
var messageContent = JsonConvert.DeserializeObject<QueueEntity>(message);
result = true;
}
catch (Exception e)
{
Trace.TraceError(e.ToString());
}
return result;
}
private async Task StartPollingQueue(CloudQueue queue)
{
if (this.isEnabled)
{
Task pollQueue = Task.Factory.StartNew(() => Parallel.For(0, this.numberOfParallelTasks, work =>
{
this.Start(queue);
}));
}
}
private async Task Start(CloudQueue queue)
{
while (true)
{
try
{
CloudQueueMessage retrievedMessage = await queue.GetMessageAsync();
if (retrievedMessage != null)
{
// Fail Logic
if (retrievedMessage.DequeueCount > this.maxRetryLimit)
{
await queue.DeleteMessageAsync(retrievedMessage);
}
bool isPass = await this.ProcessMessageAsync(newChannelSettings);
if (isPass)
{
await queue.DeleteMessageAsync(retrievedMessage);
}
}
else
{
// If queue is empty, then the Task can sleep for sleepTime duration
await Task.Delay(this.sleepTime);
}
}
catch (Exception e)
{
Trace.TraceError(e.ToString());
}
}
}
方法2:
但是,后来不得不转向最佳方法,即使用worker-roles,然后使用Tasks运行后台线程来执行此任务。