0

嗨,我正在使用事件中心以 1 秒的频率摄取数据。

我不断地将模拟数据从控制台应用程序推送到事件中心,然后存储到 SQL 数据库中。

现在已经超过 5 天了,我发现我的接收器每天都会处理两次数据,这就是我将重复记录输入数据库的原因。

由于它一天只发生一两次,所以我什至无法追踪。

到目前为止,有人会遇到这种情况吗?或者是否有可能主机可以两次处理相同的消息?还是接收器的异步行为问题?

期待帮助......

代码片段:

 public class SimpleEventProcessor : IEventProcessor
{  
    Stopwatch checkpointStopWatch;

    async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
    {
        Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
        if (reason == CloseReason.Shutdown)
        {
            await context.CheckpointAsync();
        }
    }

    Task IEventProcessor.OpenAsync(PartitionContext context)
    {
        Console.WriteLine("SimpleEventProcessor initialized.  Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
        this.checkpointStopWatch = new Stopwatch();
        this.checkpointStopWatch.Start();
        return Task.FromResult<object>(null);
    }

    async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {

        foreach (EventData eventData in messages)
        {
            string data = Encoding.UTF8.GetString(eventData.GetBytes());

          // store data into SQL database / database call.

        }           


        // Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
        if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(0))
        {
            await context.CheckpointAsync();
            this.checkpointStopWatch.Restart();
        }

        if (messages.Count() > 0)
            await context.CheckpointAsync();
    } 
}
4

1 回答 1

0

事件中心保证至少一次交付

它具有以下特点:

  • 低延迟
  • 每秒能够接收和处理数百万个事件
  • 至少一次交货

所以你可以期待这会发生。

还要考虑到检查点刚刚发生的情况,然后处理更多消息(让它们称为 A 和 B)然后该过程失败。下一次失败消息消费后再次启动读取过程,会从最后一个checkpointed消息开始,也就是说,消息A和B会被再次处理。

于 2018-07-11T17:25:36.617 回答