2

延迟队列是一个队列,其中每条消息都有与其相关联的延迟时间,并且只有在延迟到期时才能获取消息。队列的头部是过去延迟过期最远的消息。如果没有延迟过期,则没有头,出队将返回 null。

实际上,我正在使用 Azure 编写一个云应用程序,在 Azure 中只有 FIFO 队列可用,而不是优先级/延迟队列。所以我来到这里寻找是否有人可以给我一些指示,从那里我可以朝着正确的方向开始。我用谷歌搜索了很多,但只发现了 Java 中的延迟队列实现,没有标准的教程/研究论文讨论一般的延迟队列。

编辑:

我有什么代码?
实际上,我必须首先设计这些东西并将其提交给我的经理,一旦我们完成设计,然后只有我可以开始编码。

有关该场景的更多详细信息
它是一个基于主/从模型的分布式应用程序。主服务器产生消息并将它们放入 Azure 服务总线队列,并且有多个从服务器(在多台机器上运行)从队列中读取并处理它们。如果万一主站宕机,则其中一个从站充当主站并开始生成消息。我不想在主服务器中存储任何状态信息,因为万一主服务器出现故障,所有状态信息也会随之而来。

4

3 回答 3

11

Windows Azure 队列消息具有延迟(以秒为单位),您可以在将消息插入队列时指定该延迟。在达到超时延迟之前,一条消息将不可见。请参阅此 MSDN 文章以查看 API 详细信息。

隐形超时也在各种语言 SDK 实现中实现。由于您使用的是 C#,因此AddMessage()调用如下所示。AddMessage()注意指定隐身超时的第三个参数:

        var acct = CloudStorageAccount.DevelopmentStorageAccount;
        var queueClient = acct.CreateCloudQueueClient();
        var queue = queueClient.GetQueueReference("myqueue");
        queue.CreateIfNotExist();

        var msg = new CloudQueueMessage("test message");
        queue.AddMessage(msg, TimeSpan.FromHours(2), TimeSpan.FromMinutes(30));
于 2013-01-04T15:29:55.920 回答
3

所以首先我们需要一个优先队列的实现。这是我不久前写的一篇。这可能并不理想;它有一个很小的 ​​API,它可能性能更好,但它是一个足够的起点:

public class PriorityQueue<TPriority, TElement>
{
    SortedDictionary<TPriority, Queue<TElement>> dictionary = new SortedDictionary<TPriority, Queue<TElement>>();
    public PriorityQueue()
    {
    }

    public Tuple<TPriority, TElement> Peek()
    {
        var firstPair = dictionary.First();
        return Tuple.Create(firstPair.Key, firstPair.Value.First());
    }

    public TElement Pop()
    {
        var firstPair = dictionary.First();
        TElement output = firstPair.Value.Dequeue();

        if (!firstPair.Value.Any())
            dictionary.Remove(firstPair.Key);

        return output;
    }

    public void Push(TPriority priority, TElement element)
    {
        Queue<TElement> queue;
        if (dictionary.TryGetValue(priority, out queue))
        {
            queue.Enqueue(element);
        }
        else
        {
            var newQueue = new Queue<TElement>();
            newQueue.Enqueue(element);
            dictionary.Add(priority, newQueue);
        }
    }
}

包装时延迟队列很简单:

public class DelayQueue<T>
{
    private PriorityQueue<DateTime, T> queue = new PriorityQueue<DateTime, T>();
    public void Enqueue(T item, int delay)
    {
        queue.Push(DateTime.Now.AddMilliseconds(delay), item);
    }

    public T Dequeue()
    {
        if (queue.Peek().Item1 > DateTime.Now)
            return queue.Pop();
        else
            return default(T);
    }
}
于 2013-01-04T15:30:49.197 回答
0

您如何构建一个包含两步过程的队列以使项目出队。这是高级流程:

  • 出列FIFO队列中的第一项;将其不可见性设置为 N 分钟(无论您决定不可见性应该是什么) - 这允许您使该项目在一段时间内不可见,就好像它不存在于队列中一样。这是我指的NextVisibleTime属性。

  • 检查DequeueCount属性 - 如果出队计数为 0,则这是该项目第一次出队。忽略该项目并继续前进。由于设置了它的隐身性,它不会被再次获取,直到它的时间。如果出队计数为 1 或更大,则它已出队一次,并且必须在所需的时间内设置为不可见。

这应该允许您实现延迟队列。我也可以想到其他方法。例如队列中的每个项目作为一个创建时间;这可用于动态计算项目需要保持不可见的时间。要更改属性的不可见性,请检查此方法:http: //msdn.microsoft.com/en-us/library/microsoft.windowsazure.storageclient.cloudqueue.updatemessage.aspx

于 2013-01-04T15:30:23.923 回答