我正忙于EventProcessorHost
为 azure EventBus 客户端实现客户端。
我有一个实现IEventProcessor
如下的类:
public class MyEventProcessor : IEventProcessor
{
Stopwatch checkpointStopWatch;
//TODO: get provider id from parent class
public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
Debug.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
public Task OpenAsync(PartitionContext context)
{
Debug.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
eventHandler = new MyEventHandler();
this.checkpointStopWatch = new Stopwatch();
this.checkpointStopWatch.Start();
return Task.FromResult<object>(null);
}
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData eventData in messages)
{
string data = Encoding.UTF8.GetString(eventData.GetBytes());
Debug.WriteLine(data);
}
//Call checkpoint every 5 minutes, so that worker can resume processing from the 5 minutes back if it restarts.
if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
{
await context.CheckpointAsync();
this.checkpointStopWatch.Restart();
}
}
}
然后我将其称为如下:
EventProcessorHost _eventProcessorHost = new EventProcessorHost(eventProcessorHostName, EndpointName, EventHubConsumerGroup.DefaultGroupName, ConnectionString, storageConnectionString, "messages-events");
await _eventProcessorHost.RegisterEventProcessorAsync<MyEventProcessor>();
我需要将参数传递给创建的MyEventProcessor
实例EventProcessorHost
。我该怎么做呢?