我有一个 ServiceFabric 无状态服务,我想使用 EventProcessorHost 从特定的 EventHub 分区中读取数据。我想读取两个事件中心分区,每个分区映射到一个 EventProcessorHost
第一个 EventHub 分区 => 第一个 EventProcessorHost
第二个 EventHub 分区 => 第二个 EventProcessorHost
var eventHubClient = EventHubClient.CreateFromConnectionString(serviceBusConnectionString, eventHubName);
// Get the default Consumer Group
eventProcessorHost = new EventProcessorHost(Guid.NewGuid().ToString(),
eventHubClient.Path.ToLower(),
consumerGroupName.ToLower(),
serviceBusConnectionString,
storageAccountConnectionString)
{
PartitionManagerOptions = new PartitionManagerOptions
{
AcquireInterval = TimeSpan.FromSeconds(10), // Default is 10 seconds
RenewInterval = TimeSpan.FromSeconds(10), // Default is 10 seconds
LeaseInterval = TimeSpan.FromSeconds(30) // Default value is 30 seconds
}
};
ServiceEventSource.Current.Message(RegisteringEventProcessor);
var eventProcessorOptions = new EventProcessorOptions
{
InvokeProcessorAfterReceiveTimeout = true,
MaxBatchSize = 100,
PrefetchCount = 100,
ReceiveTimeOut = TimeSpan.FromSeconds(30),
};
eventProcessorOptions.ExceptionReceived += EventProcessorOptions_ExceptionReceived;
await eventProcessorHost.RegisterEventProcessorFactoryAsync(new EventProcessorFactory<EventProcessor>(deviceActorServiceUri),
eventProcessorOptions);