1

这是取自此处的 IEventProcessor 实现的一部分:

public class SimpleEventProcessor : IEventProcessor
{
    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
    {
        foreach (EventData eventData in events)
        {

        }
    }
}

将新事件添加到 EventHub 时,会调用 ProcessEventsAsync 方法,并且可以使用 foreach 循环来处理这些事件。我现在想使用例如此处讨论的 ObserverRegistry 将观察者添加到 SimpleEventProcessor 。建议的 ObserverRegistry 如下所示:

public class ObserverRegistry : IObserverRegistry<IProjectionWriterFactory>
{
    IEnumerable<object> GetObservers(IProjectionWriterFactory factory)
    {
        yield return new LoanApplicationObserver();
        yield return new OfferObserver();
        // more observers...
    }
}

不幸的是,缺少一些东西。如何使用 SimpleEventProcessor 注册多个观察者,以便将事件从 ProcessEventsAsync 传递给所有观察者并最终传递给它们的 When 方法?

4

1 回答 1

2

完整的源代码在这里。概要如下:

您可以在 SimpleEventProcessor 上定义一个静态事件:

public class SimpleEventProcessor : IEventProcessor 
{
    public static event EventHandler<MessageReceivedEventArgs> OnMessageReceived;        

    public SimpleEventProcessor()
    { }
}

然后在 ProcessEventsAsync 中引发 OnMessageReceived 事件:

public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
    foreach (EventData message in messages)
    {
        OnMessageReceived(this, new MessageReceivedEventArgs() { ReceivedOn = DateTimeOffset.UtcNow, Message = message });
    }
}

非常重要:确保在处理器关闭时删除所有订阅者。这非常重要,因为缺少取消订阅的静态事件可能会导致内存泄漏文章对此进行了解释

public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
    if (OnMessageReceived != null)
    {
        foreach (EventHandler<MessageReceivedEventArgs> subscriber in OnMessageReceived.GetInvocationList())
        {
            OnMessageReceived -= subscriber;
        }
    }
}

最后,您可以连接观察者作为初始化逻辑的一部分:

ObserverRegistry registry = new ObserverRegistry();
foreach (IObserver observer in registry.GetObservers())
{
    SimpleEventProcessor.OnMessageReceived += new EventHandler<MessageReceivedEventArgs>(
    (sender, e) => observer.When(e));
}

控制台应用程序的示例输出:

SimpleEventProcessor: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18
Observer1: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18
Observer2: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18
SimpleEventProcessor: a29d5875-7c53-4a7c-8113-ef7c24c2851f
Observer1: a29d5875-7c53-4a7c-8113-ef7c24c2851f
Observer2: a29d5875-7c53-4a7c-8113-ef7c24c2851f

我想强调以下几点:

  1. 在您的用例中,注册 aIEventProcessorFactory可能会更有效,因为您可以更好地控制处理器的实例化和处置。
  2. 建议尽可能保持 ProcessEventsAsync 方法的轻便和快速。在您的情况下,创建单独的消费者组可能是更好的选择吗?

希望以上回答你的问题。

于 2016-02-05T23:22:56.770 回答