1

我正在为 azure 开发两个 WebJobs:一个将使用一个主题将消息放入服务总线队列,另一个使用相同的主题订阅 ServiceBusTrigger。

消息已正确发送到服务总线队列,但在运行订阅了 ServiceBusTrigger 的 WebJob 时,这些消息不会以 FIFO 为基础进行处理。

将消息放入服务总线队列的 WebJob 的代码如下:

NamespaceManager namespaceManager = NamespaceManager.Create();

// Delete if exists
if (namespaceManager.TopicExists("SampleTopic"))
{
    namespaceManager.DeleteTopic("SampleTopic");
}

TopicDescription td = new TopicDescription("SampleTopic");
td.SupportOrdering = true;
TopicDescription myTopic = namespaceManager.CreateTopic(td);

SubscriptionDescription myAuditSubscription = namespaceManager.CreateSubscription(myTopic.Path, "ImporterSubscription");

TopicClient topicClient = TopicClient.Create("SampleTopic");
for(int i = 1; i <= 10; i++)
{
    var message = new BrokeredMessage("message"+i);                   
    topicClient.Send(message);
}
topicClient.Close();

订阅服务总线触发器的 WebJob 具有以下代码:

namespace HO.Importer.Azure.WebJob.TGZProcessor
{
    public class Program
    {
        static void Main(string[] args)
        {
            JobHostConfiguration config = new JobHostConfiguration();
            config.UseServiceBus();
            JobHost host = new JobHost(config);
            host.RunAndBlock();
        }

        public static void WriteLog([ServiceBusTrigger("SampleTopic", "ImporterSubscription")] string message,
            TextWriter logger)
        {                
            Console.WriteLine(message));
        }
    }
}

如何实现将队列中的消息作为 FIFO 处理?

提前致谢!

4

2 回答 2

5

使用 SessionId 或 PartitionKey,这将确保消息由同一个消息代理处理。

请参阅:https ://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-partitioning

"SessionId:如果一条消息设置了 BrokeredMessage.SessionId 属性,那么 Service Bus 使用该属性作为分区键。这样,属于同一个会话的所有消息都由同一个消息代理处理。这使 Service Bus 能够保证消息顺序以及会话状态的一致性。”

于 2017-05-05T10:20:16.867 回答
4

虽然 Azure 服务总线提供 FIFO 功能(会话),但最好不要在基于代理的排队系统中假设这种行为。Ben Morris 有一篇很好的帖子不要假设 Azure 服务总线中的消息排序,因为假设使用异步消息进行排序几乎是一个谬误,也是其原因所在。

于 2016-07-20T06:50:19.620 回答