我们有 4 个 Azure 队列,它们通过直接 REST API 或我们提供的 WCF 服务填充。
- 我们希望有一个工作者角色来监控所有这 4 个队列。
- 我正在考虑使用多线程从配置中读取队列名称等并旋转处理方法(从队列中读取消息并进行处理)
有人可以为我提供一个示例或指导如何在 Worker 角色中实现这一点吗?
不太确定是否可以在没有多线程的情况下实现上述目标,因为我对多线程很陌生。
谢谢
我们有 4 个 Azure 队列,它们通过直接 REST API 或我们提供的 WCF 服务填充。
有人可以为我提供一个示例或指导如何在 Worker 角色中实现这一点吗?
不太确定是否可以在没有多线程的情况下实现上述目标,因为我对多线程很陌生。
谢谢
您可以为不同的任务触发不同的线程,但也可以考虑非线程方法(根据您对消息的处理,其性能可能会更好或更差):
while (true)
{
var msg = queue1.GetMessage();
if (msg != null)
{
didSomething = true;
// do something with it
queue1.DeleteMessage(msg);
}
msg = queue2.GetMessage();
if (msg != null)
{
didSomething = true;
// do something with it
queue2.DeleteMessage(msg);
}
// ...
if (!didSomething) Thread.Sleep(TimeSpan.FromSeconds(1)); // so I don't enter a tight loop with nothing to do
}
这是我们当前的实现,可以完全按照您的要求进行,但以更好的方式(或者我们认为)。也就是说,这段代码仍然需要大量清理。不过,这是此功能的 0.1 版本。
public class WorkerRole : RoleEntryPoint
{
public override void Run()
{
var logic = new WorkerAgent();
logic.Go(false);
}
public override bool OnStart()
{
// Initialize our Cloud Storage Configuration.
AzureStorageObject.Initialize(AzureConfigurationLocation.AzureProjectConfiguration);
return base.OnStart();
}
}
public class WorkerAgent
{
private const int _resistance_to_scaling_larger_queues = 9;
private Dictionary<Type, int> _queueWeights = new Dictionary<Type, int>
{
{typeof (Queue1.Processor), 1},
{typeof (Queue2.Processor), 1},
{typeof (Queue3.Processor), 1},
{typeof (Queue4.Processor), 1},
};
private readonly TimeSpan _minDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MinDelay")));
private readonly TimeSpan _maxDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MaxDelay")));
protected TimeSpan CurrentDelay { get; set; }
public Func<string> GetSpecificQueueTypeToProcess { get; set; }
/// <summary>
/// This is a superset collection of all Queues that this WorkerAgent knows how to process, and the weight of focus it should receive.
/// </summary>
public Dictionary<Type, int> QueueWeights
{
get
{
return _queueWeights;
}
set
{
_queueWeights = value;
}
}
public static TimeSpan QueueWeightCalibrationDelay
{
get { return TimeSpan.FromMinutes(15); }
}
protected Dictionary<Type, DateTime> QueueDelays = new Dictionary<Type, DateTime>();
protected Dictionary<Type, AzureQueueMetaData> QueueMetaData { get; set; }
public WorkerAgent(Func<string> getSpecificQueueTypeToProcess = null)
{
CurrentDelay = _minDelay;
GetSpecificQueueTypeToProcess = getSpecificQueueTypeToProcess;
}
protected IProcessQueues CurrentProcessor { get; set; }
/// <summary>
/// Processes queue request(s).
/// </summary>
/// <param name="onlyProcessOnce">True to only process one time. False to process infinitely.</param>
public void Go(bool onlyProcessOnce)
{
if (onlyProcessOnce)
{
ProcessOnce(false);
}
else
{
ProcessContinuously();
}
}
public void ProcessContinuously()
{
while (true)
{
// temporary hack to get this started.
ProcessOnce(true);
}
}
/// <summary>
/// Attempts to fetch and process a single queued request.
/// </summary>
public void ProcessOnce(bool shouldDelay)
{
PopulateQueueMetaData(QueueWeightCalibrationDelay);
if (shouldDelay)
{
Thread.Sleep(CurrentDelay);
}
var typesToPickFrom = new List<Type>();
foreach(var item in QueueWeights)
{
for (var i = 0; i < item.Value; i++)
{
typesToPickFrom.Add(item.Key);
}
}
var randomIndex = (new Random()).Next()%typesToPickFrom.Count;
var typeToTryAndProcess = typesToPickFrom[randomIndex];
CurrentProcessor = ObjectFactory.GetInstance(typeToTryAndProcess) as IProcessQueues;
CleanQueueDelays();
if (CurrentProcessor != null && !QueueDelays.ContainsKey(typeToTryAndProcess))
{
var errors = CurrentProcessor.Go();
var amountToDelay = CurrentProcessor.NumberProcessed == 0 && !errors.Any()
? _maxDelay // the queue was empty
: _minDelay; // else
QueueDelays[CurrentProcessor.GetType()] = DateTime.Now + amountToDelay;
}
else
{
ProcessOnce(true);
}
}
/// <summary>
/// This method populates/refreshes the QueueMetaData collection.
/// </summary>
/// <param name="queueMetaDataCacheLimit">Specifies the length of time to cache the MetaData before refreshing it.</param>
private void PopulateQueueMetaData(TimeSpan queueMetaDataCacheLimit)
{
if (QueueMetaData == null)
{
QueueMetaData = new Dictionary<Type, AzureQueueMetaData>();
}
var queuesWithoutMetaData = QueueWeights.Keys.Except(QueueMetaData.Keys).ToList();
var expiredQueueMetaData = QueueMetaData.Where(qmd => qmd.Value.TimeMetaDataWasPopulated < (DateTime.Now - queueMetaDataCacheLimit)).Select(qmd => qmd.Key).ToList();
var validQueueData = QueueMetaData.Where(x => !expiredQueueMetaData.Contains(x.Key)).ToList();
var results = new Dictionary<Type, AzureQueueMetaData>();
foreach (var queueProcessorType in queuesWithoutMetaData)
{
if (!results.ContainsKey(queueProcessorType))
{
var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues;
if (queueProcessor != null)
{
var queue = new AzureQueue(queueProcessor.PrimaryQueueName);
var metaData = queue.GetMetaData();
results.Add(queueProcessorType, metaData);
QueueWeights[queueProcessorType] = (metaData.ApproximateMessageCount) == 0
? 1
: (int)Math.Log(metaData.ApproximateMessageCount, _resistance_to_scaling_larger_queues) + 1;
}
}
}
foreach (var queueProcessorType in expiredQueueMetaData)
{
if (!results.ContainsKey(queueProcessorType))
{
var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues;
if (queueProcessor != null)
{
var queue = new AzureQueue(queueProcessor.PrimaryQueueName);
var metaData = queue.GetMetaData();
results.Add(queueProcessorType, metaData);
}
}
}
QueueMetaData = results.Union(validQueueData).ToDictionary(data => data.Key, data => data.Value);
}
private void CleanQueueDelays()
{
QueueDelays = QueueDelays.Except(QueueDelays.Where(x => x.Value < DateTime.Now)).ToDictionary(x => x.Key, x => x.Value);
}
}
有了这个,我们就有了一个知道如何处理每个队列的单独的类,它实现了 IProcessQueues。_queueWeights
我们用我们希望它处理的每种类型加载集合。我们设置_resistance_to_scaling_larger_queues
常数来控制我们希望它如何缩放。请注意,这以对数方式缩放(参见PopulateQueueMetaData
方法)。没有队列的权重小于 1,即使它有 0 个项目。如果您设置PopulateQueueMetaData
为10
,那么每增加 10 个数量级,该类型的“权重”就会增加 1。例如,如果您有 QueueA 有 0 个项目,QueueB 有 0 个项目,而 QueueC 有 10 个项目,那么您各自的权重是1、1 和 2。这意味着 QueueC 有 50% 的机会接下来被处理,而 QueueA 和 QueueB 每个只有 25% 的机会被处理。如果 QueueC 有 100 个项目,那么你的权重是 1、1、3,你被处理的机会是 20%、20%、60%。这可确保您的空队列不会被遗忘。
这样做的另一件事是它具有_minDelay
和_maxDelay
。如果此代码认为队列中至少有 1 个项目,那么它将继续以尽可能快的_minDelay
速度处理它。但是,如果它最后有 0 个项目,那么它将不允许它的处理速度快于_maxDelay
速率。因此,这意味着如果随机数生成器拉起包含 0 个项目的队列(无论权重如何),它将简单地跳过尝试处理它并继续进行下一次迭代。(可以在这部分进行一些额外的优化以获得更好的存储事务效率,但这是一个巧妙的小补充。)
我们在这里有几个自定义类(例如AzureQueue
and AzureQueueMetaData
) - 一个本质上是 a 的包装器,CloudQueue
另一个存储一些信息,例如队列的近似计数 - 那里没有什么有趣的(只是一种简化代码的方法) .
同样,我不称此代码为“漂亮”的代码,但在此代码中实现了一些相当聪明的概念并发挥了作用。出于任何你想要的原因使用它。:)
最后,像这样编写这段代码可以让我们拥有一个可以处理更多队列的项目。如果我们发现这根本跟不上,我们可以轻松地将其扩展到更大数量的实例,并针对所有队列进行扩展。在最小的场景中,您可以部署一个这样的实例来监控 3 个队列。但是,如果第 4 个队列开始影响性能(或者您需要更高的可用性),则将其增加到 2 个实例。一旦你达到 15 个队列,添加第三个。25 个队列,添加第 4 个实例。获得一个新客户,并且需要在整个系统中处理许多队列请求,这很好。将这个角色旋转最多 20 次,直到完成,然后将它们旋转回去。有一个特别讨厌的队列?评论队列中的_queueWeights
集合,部署以管理其余队列,然后将其与所有其他队列一起重新部署,但该队列已从集合中注释掉_queueWeights
,然后再次将其部署到另一组实例并在没有 a) 的情况下进行调试其他 QueueProcessors干扰您的调试和 b) 您的调试干扰您的其他 QueueProcessor。最终,这提供了很大的灵活性和效率。
在 worker 角色的 while 循环内,启动 4 个线程,就像在编写多线程 C# 应用程序一样。当然,您需要定义四个不同的线程函数,并且这些函数应该有单独的 while 循环来轮询队列。在worker的while循环结束时,只需等待线程完成。