5

我使用事件中心处理器主机来接收和处理来自事件中心的事件。为了获得更好的性能,我每 3 分钟调用一次检查点,而不是每次接收事件时调用一次:

public async Task ProcessEventAsync(context, messages)
{
 foreach (var eventData in messages)
 {
    // do something
 }

 if (checkpointStopWatth.Elapsed > TimeSpan.FromMinutes(3);
 {
     await context.CheckpointAsync();
 }
}

但问题是,如果没有新事件发送到事件中心,则可能有一些事件永远不会成为检查点,因为如果没有新消息,则不会调用 ProcessEventAsync。

有什么建议可以确保所有已处理的事件都是检查点,但仍然每隔几分钟检查一次?

更新:根据 Sreeram 的建议,我将代码更新如下:

public async Task ProcessEventAsync(context, messages)
{
    foreach (var eventData in messages)
    {
     // do something    
    }

    this.lastProcessedEventsCount += messages.Count();

    if (this.checkpointStopWatth.Elapsed > TimeSpan.FromMinutes(3);
    {
        this.checkpointStopWatch.Restart();
        if (this.lastProcessedEventsCount > 0)
        {
            await context.CheckpointAsync();
            this.lastProcessedEventsCount = 0;
        }
    }
}
4

1 回答 1

5

Great case - you are covering!

You could experience loss of event checkpoints (and as a result event replay) in the below 2 cases:

  1. when you have sparse data flow (for ex: a batch of messages every 5 mins and your checkpoint interval is 3 mins) and EventProcessorHost instance closes for some reason - you could see 2 min of EventData - re-processing. To handle that case, Keep track of the lastProcessedEvent after completing IEventProcessor.onEvents/IEventProcessor.ProcessEventsAsync & checkpoint when you get notified on close - IEventProcessor.onClose/IEventProcessor.CloseAsync.

  2. There might just be a case when - there are no more events to a specific EventHubs partition. In this case, you would never see the last event being checkpointed - with your Checkpointing strategy. However, this is uncommon, when you have continuous flow of EventData and you are not sending to specific EventHubs partition (EventHubClient.send(EventData_Without_PartitionKey)). If you think - you could run into this situation, use the:

    EventProcessorOptions.setInvokeProcessorAfterReceiveTimeout(true); // in java or EventProcessorOptions.InvokeProcessorAfterReceiveTimeout = true; // in C#

flag to wake up the processEventsAsync every so often. Then, keep track of, LastProcessedEventData and LastCheckpointedEventData and make a judgement whether to checkpoint when no Events are received, based on EventData.SequenceNumber property on those events.

于 2018-08-15T21:24:39.453 回答