8

我正忙于EventProcessorHost为 azure EventBus 客户端实现客户端。

我有一个实现IEventProcessor如下的类:

 public class MyEventProcessor : IEventProcessor
    {
        Stopwatch checkpointStopWatch;            

        //TODO: get provider id from parent class     


        public async Task CloseAsync(PartitionContext context, CloseReason reason)
        {
            Debug.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
            if (reason == CloseReason.Shutdown)
            {
                await context.CheckpointAsync();
            }
        }

        public Task OpenAsync(PartitionContext context)
        {
            Debug.WriteLine("SimpleEventProcessor initialized.  Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
            eventHandler = new MyEventHandler();
            this.checkpointStopWatch = new Stopwatch();
            this.checkpointStopWatch.Start();
            return Task.FromResult<object>(null);
        }

        async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            foreach (EventData eventData in messages)
            {
                string data = Encoding.UTF8.GetString(eventData.GetBytes());              
                Debug.WriteLine(data);       
            }
            //Call checkpoint every 5 minutes, so that worker can resume processing from the 5 minutes back if it restarts.
            if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
            {
                await context.CheckpointAsync();
                this.checkpointStopWatch.Restart();
            }
        }
    }

然后我将其称为如下:

 EventProcessorHost _eventProcessorHost = new EventProcessorHost(eventProcessorHostName, EndpointName, EventHubConsumerGroup.DefaultGroupName, ConnectionString, storageConnectionString, "messages-events");
 await _eventProcessorHost.RegisterEventProcessorAsync<MyEventProcessor>();

我需要将参数传递给创建的MyEventProcessor实例EventProcessorHost。我该怎么做呢?

4

2 回答 2

22

您只需要使用RegisterEventProcessorFactoryAsync来传入工厂实例。该工厂类可以传入工厂方法中适当的任何参数,可能首先将它们传递给工厂,或者让工厂改变行为。在下面草绘的代码中,您可以看到两个参数被传递到IEventProcessor中。其中一个来自工厂的参数,另一个是工厂被调用次数的计数器。

class AzureStreamProcessor : IEventProcessor
{
     ....
}

class AzureStreamProcessorFactory : IEventProcessorFactory
{
    public AzureStreamProcessorFactory(string str)
    {
         this.randomString = str;
    }

    private string randomString;
    private int numCreated = 0;
    IEventProcessor IEventProcessorFactory.CreateEventProcessor(PartitionContext context)
    {
        return new AzureStreamProcessor(context, randomString, Interlocked.Increment(ref numCreated));
    }
}

host.RegisterEventProcessorFactoryAsync(new AzureStreamProcessorFactory("a parameter"), options);
于 2015-11-20T09:40:16.417 回答
1

可以尝试使用类似下面的参数对MyEventProcessor 类进行构造函数依赖注入。

     public class MyEventProcessor : IEventProcessor
    {
        Stopwatch checkpointStopWatch;            

        //TODO: get provider id from parent class     
    IParameters _parameter;
    public MyEventProcessor (IParameters param)
    {
      this._parameter  = param;
     }

        public async Task CloseAsync(PartitionContext context, CloseReason reason)
        {
            Debug.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
            if (reason == CloseReason.Shutdown)
            {
                await context.CheckpointAsync();
            }
        }.....

使用_parameter 检索您需要的内容。

下面是如何注册IParameters的依赖项

这里我使用 Ninject 依赖解析器。

//Bind the class that implements IParameter.
 var parameters = new Parameter();
paramters.Property = "my data"

 kernel.Bind<IParameters>().ToConstant(parameters);

希望有帮助

于 2015-10-29T08:02:12.967 回答