我创建了一个简单的事件处理器主机,它从配置了租赁管理默认选项的 Azure IOT 集线器读取事件。
但是,即使我正在运行 EventProcessorHost 的单个实例,我也会定期出现以下异常:
ReceiverDisconnectedException:创建了具有更高纪元的新接收器,因此当前具有纪元的接收器正在断开连接。原因:LeaseLost
客户端重新获取租约并进一步处理消息。
这里是 EventProcessorHost 的初始化
EventProcessorHost eventProcessorHost = new EventProcessorHost(hostName, eventHubName, EventHubConsumerGroup.DefaultGroupName,
eventHubConnectionString, storageAccountConnectionString);
var options = new EventProcessorOptions();
options.ExceptionReceived += (sender, e) =>
{
Console.WriteLine(e.Exception);
};
eventProcessorHost.PartitionManagerOptions = PartitionManagerOptions.DefaultOptions;
eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(options).Wait();
这种行为是正常的还是我需要为单个实例做一些事情来避免它?
下面是我的事件处理器:
public class SimpleEventProcessor : IEventProcessor
{
Stopwatch checkpointStopWatch;
public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
await context.CheckpointAsync();
}
public Task OpenAsync(PartitionContext context)
{
Console.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
this.checkpointStopWatch = new Stopwatch();
this.checkpointStopWatch.Start();
this.rawDataService = new RawDataService();
return Task.FromResult<object>(null);
}
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData eventData in messages)
{
string data = Encoding.UTF8.GetString(eventData.GetBytes());
Console.WriteLine(string.Format("Message received. Partition: '{0}', Data: '{1}'", context.Lease.PartitionId, data));
}
}
}