在我的场景中,我将有一批事件同时进入,然后很长一段时间 EventHub 将处于空闲状态。在我的处理器客户端中,我想每 N 个事件或 N 分钟检查一次(以先到者为准)。
这是我设置 Azure.Messaging.EventHubs.EventProcessorClient 的方式:
EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
//Start Stopwatch
_checkpointStopWatch = new Stopwatch();
_checkpointStopWatch.Start();
// Start the processing
await processor.StartProcessingAsync();
while (true)
{
await Task.Delay(TimeSpan.FromSeconds(10));
Console.WriteLine($"{eventsProcessed} events have been processed");
}
在我的 ProcessEventHandler 中,我检查 eventsProcessedSinceLastCheckpoint 以及秒表上经过的时间。当任何一个达到最大值时,我都会重置两者并在我的控制台窗口中记下它:
static async Task<Task> ProcessEventHandler(ProcessEventArgs eventArgs)
{
++eventsProcessed;
++eventsProcessedSinceLastCheckpoint;
Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
// After every 100 events or 2 minutes we add a checkpoint. Whichever occurs first
if(eventsProcessedSinceLastCheckpoint >= 100 || _checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(2))
{
eventsProcessedSinceLastCheckpoint = 0;
_checkpointStopWatch.Restart();
await eventArgs.UpdateCheckpointAsync();
Console.WriteLine("> Checkpoint Set. Count Reset. Stopwatch Reset.");
}
return Task.CompletedTask;
}
对 eventsProcessedSinceLastCheckpoint 变量的检查非常有效,因为只要有新事件进入,就会触发 ProcessEventHandler。但是,当 EventHub 空闲时,不会调用 ProcessEventHandler,因此,如果 EventHub 安静了几分钟或几小时,我将永远不会检查经过的时间。
我知道我可以删除计时器,并且如果检查点之间发生崩溃,我的处理器应该能够处理重复事件。但在我的场景中(因为我会有很长的空闲时间)我想利用我拥有的时间并赶上,以避免在可能的时候出现额外的重复。因此,在空闲期间添加计时器作为后备。
我的问题是:如何在ProcessEventHandler之外调用UpdateCheckpointAsync()?该方法似乎只存在于ProcessEventArgs上。我不能直接在 EventProcessorClient 上调用它,这将是理想的,因为我可以将计时器检查移到 ProcessEventHandler 之外并进入我的 while 循环......