0

我希望有人可以为我澄清这一点:

我在同一个 ConsumerGroup 中有 2 个消费者,我的理解是他们应该在他们之间进行协调,但我遇到的问题是两个消费者都收到了所有的消息。我的代码很简单:

const connectionString =...";
const eventHubName = "my-hub-dev";
const consumerGroup = "processor"; 

async function main() {
  const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
  const subscription = consumerClient.subscribe({
      processEvents: async (events, context) => {
        for (const event of events) {
          console.log(`Received event...`, event)
        }        
      },
    }
  );

如果我运行此消费者代码的两个实例并发布一个事件,则两个实例都会收到该事件。

所以我的问题是:

  • 我的理解是否正确,只有 1 个消费者应该收到该消息?
  • 我在这里有什么遗漏吗?
4

1 回答 1

3

EventHubConsumerClient 需要一个 CheckpointStore 来促进多个客户端之间的协调。您可以在实例化它时将其传递给 EventHubConsumerClient 构造函数。

使用@azure/eventhubs-checkpointstore-blobAzure 存储 Blob 来存储元数据,并且需要使用同一使用者组来协调多个使用者。它还存储检查点数据:您可以使用事件调用 context.updateCheckpoint ,如果您停止并启动新的接收器,它将从与事件关联的分区中的最后一个检查点事件继续。

这里有一个完整的示例@azure/eventhubs-checkpointstore-blobhttps ://github.com/Azure/azure-sdk-for-js/blob/master/sdk/eventhub/eventhubs-checkpointstore-blob/samples/javascript/receiveEventsUsingCheckpointStore.js

说明:除非客户端指定了ownerLevel ,否则事件中心服务不会在从使用者组读取时强制分区的单个所有者。最高的所有者级别“获胜”。您可以在传递给subscribe的选项包中设置它,但如果您希望 CheckpointStore 为您处理协调,最好不要设置它。

于 2021-03-18T18:52:09.237 回答