TLDR:看起来很合理,只需通过 CreateConsumerGroupIfNotExists 使用不同的名称来创建两个消费者组。
消费者组主要是一个概念,因此它们的工作方式取决于您的订阅者的实施方式。如您所知,从概念上讲,它们是一组订阅者一起工作,因此每个组都接收所有消息,并且在理想(不会发生)的情况下,可能会使用每条消息一次。这意味着每个消费者组将“拥有由同一工作者角色的多个实例处理的所有分区”。你要这个。
这可以以不同的方式实现。Microsoft 提供了两种直接使用来自事件中心的消息的方法,以及使用可能构建在两种直接方法之上的 Streaming Analytics 之类的选项。第一种方式是Event Hub Receiver,第二种是更高级别的Event Processor Host。
我没有直接使用Event Hub Receiver,所以这个特定的评论是基于这些系统如何工作的理论和文档中的推测:虽然它们是从EventHubConsumerGroups创建的,但由于这些接收器不相互协调,这几乎没有用处。如果您使用这些,您将需要(并且可以!)自己进行所有协调和提交偏移量,这在某些情况下具有优势,例如在与计算聚合相同的事务中将偏移量写入事务数据库。使用这些低电平接收器,使用相同的 Azure 消费者组具有不同的逻辑消费者组可能不应该(规范而不实用的建议)特别成问题,但您应该使用不同的名称,以防它确实重要或您更改为EventProcessorHosts。
现在了解更多有用的信息,EventProcessorHosts可能构建在EventHubReceivers 之上。它们是更高级别的东西,并且支持使多台机器作为逻辑消费者组一起工作。下面我从我的代码中包含了一个经过轻微编辑的片段,它制作了一个EventProcessorHost,并在解释一些选择时留下了一堆注释。
//We need an identifier for the lease. It must be unique across concurrently
//running instances of the program. There are three main options for this. The
//first is a static value from a config file. The second is the machine's NETBIOS
//name ie System.Environment.MachineName. The third is a random value unique per run which
//we have chosen here, if our VMs have very weak randomness bad things may happen.
string hostName = Guid.NewGuid().ToString();
//It's not clear if we want this here long term or if we prefer that the Consumer
//Groups be created out of band. Nor are there necessarily good tools to discover
//existing consumer groups.
NamespaceManager namespaceManager =
NamespaceManager.CreateFromConnectionString(eventHubConnectionString);
EventHubDescription ehd = namespaceManager.GetEventHub(eventHubPath);
namespaceManager.CreateConsumerGroupIfNotExists(ehd.Path, consumerGroupName);
host = new EventProcessorHost(hostName, eventHubPath, consumerGroupName,
eventHubConnectionString, storageConnectionString, leaseContainerName);
//Call something like this when you want it to start
host.RegisterEventProcessorFactoryAsync(factory)
你会注意到我告诉 Azure 如果它不存在就创建一个新的消费者组,如果它不存在你会收到一个可爱的错误消息。老实说,我不知道这样做的目的是什么,因为它不包括存储连接字符串,该字符串需要跨实例相同才能使 EventProcessorHost 的协调(并且可能是提交)正常工作。
在这里,我提供了一张来自Azure 存储资源管理器的图片,其中包含我在 11 月试验的消费者组的租约和大概抵消。请注意,虽然我有一个 testhub 和一个 testhub-testcg 容器,但这是由于手动命名它们。如果它们在同一个容器中,那就是“$Default/0”与“testcg/0”之类的东西。

如您所见,每个分区有一个 blob。我的假设是这些 blob 用于两件事。其中第一个是用于在实例之间分配分区的 Blob 租约(参见此处),第二个是存储已提交的分区内的偏移量。
消费实例不是将数据推送到消费者组,而是向存储系统询问一个分区中某个偏移量的数据。EventProcessorHosts 是拥有逻辑使用者组的一种很好的高级方式,其中每个分区一次只能被一个使用者读取,并且不会忘记逻辑使用者组在每个分区中取得的进展。
请记住,每个分区的吞吐量是经过测量的,因此如果您要最大化入口,则只能有两个全速运行的逻辑消费者。因此,您需要确保有足够的分区和吞吐量单位,以便您可以:
- 读取您发送的所有数据。
- 如果您因问题落后几个小时,请在 24 小时保留期内赶上。
总之:消费群体是你所需要的。您阅读的使用特定使用者组的示例很好,在每个逻辑使用者组中,Azure 使用者组使用相同的名称,并且不同的逻辑使用者组使用不同的名称。
我还没有使用过 Azure 流分析,但至少在预览版期间,您仅限于使用默认使用者组。因此,不要将默认使用者组用于其他用途,如果您需要两个单独的 Azure 流分析批次,您可能需要做一些讨厌的事情。但是很容易配置!