0

我正在开发一个基于 Azure 服务总线的系统,用于通过 API 快速即发即忘,并通过主题通过后台服务异步处理大量消息。在这个问题的上下文中,该主题有一个订阅,为什么它可能是一个队列。出于其他原因,我想将其保留为一个主题。

我最近将代码从使用包的 .NET 框架应用程序迁移到使用WindowsAzure.ServiceBus包的 .NET CoreMicrosoft.Azure.ServiceBus包。为了处理大量消息,我使用这样的MessageReceiver类:

var connString = "...";
var subscriptionPath = EntityNameHelper.FormatSubscriptionPath("topic", "subscription");
var messageReceiver = new MessageReceiver(connString, subscriptionPath);
while (...)
{
    var messages = await messageReceiver.ReceiveAsync(10, TimeSpan.FromSeconds(5));
    ...
}

为简单起见,我隐藏了一系列细节。就像我的应用程序启动 5 个线程并使用相同messageReceiver实例在每个线程中处理消息的事实一样。

我通常会运行多个此应用程序的实例,以跨线程和进程分布。我相信我们终于找到了我的问题的代码。在迁移到 .NET Core 和新的 NuGet 包后,我注意到只有一个应用程序同时处理消息。当打开两个控制台窗口并在每个窗口中启动一个进程时,我可以看到窗口 1 中的应用程序开始处理。Windows 2 中的应用程序不处理任何内容。几秒钟后,窗口 1 中的应用程序停止处理,窗口 2 中的应用程序开始处理。过了一会儿,它又切换回来。开关中没有真正的模式,但我的所有消息都已成功处理。

是否存在某种限制MessageReceiver,允许处理来自同一订阅或类似订阅的消息的最大线程数?

4

1 回答 1

1

我不知道MessageReceiver线程数有任何限制。不过,新库经过优化以利用并发性而无需线程(异步代码)。所以从技术上讲,您可以使用单个线程运行并有多个并发接收任务。另一种方法是使用由QueueClientand提供的消息处理程序,SubscriptionClient它允许指定并发性以轻松处理多条消息,但那些允许每个并发回调接收一条消息(无批处理)。

代理在对第一个竞争消费者的一次调用中提供尽可能多的消息。如果没有足够的消息,所有的消息将被提供给单个(或前几个)消费者。没有循环和公平分配。它确实按预期工作。

于 2020-01-23T17:02:20.923 回答