嗨,我正在使用事件中心以 1 秒的频率摄取数据。
我不断地将模拟数据从控制台应用程序推送到事件中心,然后存储到 SQL 数据库中。
现在已经超过 5 天了,我发现我的接收器每天都会处理两次数据,这就是我将重复记录输入数据库的原因。
由于它一天只发生一两次,所以我什至无法追踪。
到目前为止,有人会遇到这种情况吗?或者是否有可能主机可以两次处理相同的消息?还是接收器的异步行为问题?
期待帮助......
代码片段:
public class SimpleEventProcessor : IEventProcessor
{
Stopwatch checkpointStopWatch;
async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
Task IEventProcessor.OpenAsync(PartitionContext context)
{
Console.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
this.checkpointStopWatch = new Stopwatch();
this.checkpointStopWatch.Start();
return Task.FromResult<object>(null);
}
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData eventData in messages)
{
string data = Encoding.UTF8.GetString(eventData.GetBytes());
// store data into SQL database / database call.
}
// Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(0))
{
await context.CheckpointAsync();
this.checkpointStopWatch.Restart();
}
if (messages.Count() > 0)
await context.CheckpointAsync();
}
}