0

在我的场景中,我将有一批事件同时进入,然后很长一段时间 EventHub 将处于空闲状态。在我的处理器客户端中,我想每 N 个事件或 N 分钟检查一次(以先到者为准)。

这是我设置 Azure.Messaging.EventHubs.EventProcessorClient 的方式:

EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;

//Start Stopwatch
_checkpointStopWatch = new Stopwatch();
_checkpointStopWatch.Start();

// Start the processing
await processor.StartProcessingAsync();

while (true)
{
    await Task.Delay(TimeSpan.FromSeconds(10));
    Console.WriteLine($"{eventsProcessed} events have been processed");
}

在我的 ProcessEventHandler 中,我检查 eventsProcessedSinceLastCheckpoint 以及秒表上经过的时间。当任何一个达到最大值时,我都会重置两者并在我的控制台窗口中记下它:

static async Task<Task> ProcessEventHandler(ProcessEventArgs eventArgs)
{
   ++eventsProcessed;
   ++eventsProcessedSinceLastCheckpoint;

   Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));

    // After every 100 events or 2 minutes we add a checkpoint. Whichever occurs first
    if(eventsProcessedSinceLastCheckpoint >= 100 || _checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(2))
    {
        eventsProcessedSinceLastCheckpoint = 0;
        _checkpointStopWatch.Restart();

        await eventArgs.UpdateCheckpointAsync();
        Console.WriteLine("> Checkpoint Set. Count Reset. Stopwatch Reset.");
    }
    return Task.CompletedTask;

}

对 eventsProcessedSinceLastCheckpoint 变量的检查非常有效,因为只要有新事件进入,就会触发 ProcessEventHandler。但是,当 EventHub 空闲时,不会调用 ProcessEventHandler,因此,如果 EventHub 安静了几分钟或几小时,我将永远不会检查经过的时间。

我知道我可以删除计时器,并且如果检查点之间发生崩溃,我的处理器应该能够处理重复事件。但在我的场景中(因为我会有很长的空闲时间)我想利用我拥有的时间并赶上,以避免在可能的时候出现额外的重复。因此,在空闲期间添加计时器作为后备。

我的问题是:如何在ProcessEventHandler之外调用UpdateCheckpointAsync()?该方法似乎只存在于ProcessEventArgs上。我不能直接在 EventProcessorClient 上调用它,这将是理想的,因为我可以将计时器检查移到 ProcessEventHandler 之外并进入我的 while 循环......

4

1 回答 1

0

在创建处理器实例时设置该EventHubProcessorClientOptions.MaximumWaitTime属性将允许在未读取任何事件时调用您的处理程序。当设置为非空时,等待时间基本上意味着“一旦获得事件就给我事件,但如果在此时间间隔内没有读取任何事件,则 ping 我的处理程序”。

关于在这种情况下更新检查点,推荐的方法是缓存分配给处理程序的最后一个事件的参数并使用它来调用UpdateCheckpointAsync此示例演示了该方法,确保它在分区处理停止时创建一个检查点。

于 2020-04-23T18:52:16.740 回答