我在 Azure 上运行服务总线,每秒发送大约 10-100 条消息。
最近我已经切换到.net 4.5并且所有人都兴奋地重构了所有代码以在每行中至少有两次“async”和“await ”,以确保它“正确”完成:)
现在我想知道它实际上是好还是坏。如果您可以查看代码片段并让我知道您的想法。我特别担心如果线程上下文切换没有给我带来更多的痛苦而不是好处,从所有的异步中......(看看!dumpheap 这绝对是一个因素)
只是一点描述 - 我将发布 2 种方法 - 一种在 ConcurrentQueue 上执行 while 循环,等待新消息,另一种方法一次发送一条消息。我也完全按照 Azure 博士的规定使用瞬态故障处理模块。
发送循环(从头开始,等待新消息):
private async void SendingLoop()
{
try
{
await this.RecreateMessageFactory();
this.loopSemaphore.Reset();
Buffer<SendMessage> message = null;
while (true)
{
if (this.cancel.Token.IsCancellationRequested)
{
break;
}
this.semaphore.WaitOne();
if (this.cancel.Token.IsCancellationRequested)
{
break;
}
while (this.queue.TryDequeue(out message))
{
try
{
using (message)
{
//only take send the latest message
if (!this.queue.IsEmpty)
{
this.Log.Debug("Skipping qeued message, Topic: " + message.Value.Topic);
continue;
}
else
{
if (this.Topic == null || this.Topic.Path != message.Value.Topic)
await this.EnsureTopicExists(message.Value.Topic, this.cancel.Token);
if (this.cancel.Token.IsCancellationRequested)
break;
await this.SendMessage(message, this.cancel.Token);
}
}
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
ex.LogError();
}
}
}
}
catch (OperationCanceledException)
{ }
catch (Exception ex)
{
ex.LogError();
}
finally
{
if (this.loopSemaphore != null)
this.loopSemaphore.Set();
}
}
发送消息:
private async Task SendMessage(Buffer<SendMessage> message, CancellationToken cancellationToken)
{
//this.Log.Debug("MessageBroadcaster.SendMessage to " + this.GetTopic());
bool entityNotFound = false;
if (this.MessageSender.IsClosed)
{
//this.Log.Debug("MessageBroadcaster.SendMessage MessageSender closed, recreating " + this.GetTopic());
await this.EnsureMessageSender(cancellationToken);
}
try
{
await this.sendMessageRetryPolicy.ExecuteAsync(async () =>
{
message.Value.Body.Seek(0, SeekOrigin.Begin);
using (var msg = new BrokeredMessage(message.Value.Body, false))
{
await Task.Factory.FromAsync(this.MessageSender.BeginSend, this.MessageSender.EndSend, msg, null);
}
}, cancellationToken);
}
catch (MessagingEntityNotFoundException)
{
entityNotFound = true;
}
catch (OperationCanceledException)
{ }
catch (ObjectDisposedException)
{ }
catch (Exception ex)
{
ex.LogError();
}
if (entityNotFound)
{
if (!cancellationToken.IsCancellationRequested)
{
await this.EnsureTopicExists(message.Value.Topic, cancellationToken);
}
}
}
上面的代码来自每秒发送 1 条消息的“Sender”类。在任何给定时间我都有大约 50-100 个实例在运行,所以它可能是相当多的线程。
顺便说一句,不要担心 EnsureMessageSender、RecreateMessageFactory、EnsureTopicExists 太多,它们不会经常被调用。
如果我只需要一次发送一条消息,而不用担心异步内容并避免随之而来的开销,那么让一个后台线程处理消息队列并同步发送消息不是更好吗?
请注意,将一条消息发送到 Azure 服务总线通常只需几毫秒,这并不昂贵。(除了速度很慢、超时或服务总线后端出现问题时,它可能会在尝试发送内容时挂起一段时间)。
感谢和抱歉这么长的帖子,
斯蒂沃
建议的解决方案
这个例子可以解决我的情况吗?
static void Main(string[] args)
{
var broadcaster = new BufferBlock<int>(); //queue
var cancel = new CancellationTokenSource();
var run = Task.Run(async () =>
{
try
{
while (true)
{
//check if we are not finished
if (cancel.IsCancellationRequested)
break;
//async wait until a value is available
var val = await broadcaster.ReceiveAsync(cancel.Token).ConfigureAwait(false);
int next = 0;
//greedy - eat up and ignore all the values but last
while (broadcaster.TryReceive(out next))
{
Console.WriteLine("Skipping " + val);
val = next;
}
//check if we are not finished
if (cancel.IsCancellationRequested)
break;
Console.WriteLine("Sending " + val);
//simulate sending delay
await Task.Delay(1000).ConfigureAwait(false);
Console.WriteLine("Value sent " + val);
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}, cancel.Token);
//simulate sending messages. One every 200mls
for (int i = 0; i < 20; i++)
{
Console.WriteLine("Broadcasting " + i);
broadcaster.Post(i);
Thread.Sleep(200);
}
cancel.Cancel();
run.Wait();
}