1

我对使用 EventProcessorHost 和 IEventProcessor 非常陌生,我正在尝试弄清楚如何从 EventProcessorClass 中获取我的数据。如果我只想将新消息记录到控制台,我目前已经完成了所有工作。

我当前的实现(我什至不确定它是否可以接受甚至是好的做法)创建一个静态变量,然后将数据存储在其中,以便另一个处理器可以收集它。这样做可以吗,还是有更好的清洁方法来访问数据?

这是我到目前为止所拥有的(锁定机制非常基本,当我让其余代码正常工作时将被修复):

internal class Receiver
{
    public static List<string> incommingMessagesList = new List<string>();
    public static bool fIsDataListLocked = false;

    private EventProcessorHost m_EPHClient;

   ...

    Console.WriteLine( "Registering EventProcessor..." );
    await m_EPHClient.RegisterEventProcessorAsync<SimpleEventProcessor>();
 }
 public class SimpleEventProcessor : IEventProcessor
 {
   ...

    public Task ProcessEventsAsync( PartitionContext context, IEnumerable<EventData> messages )
    {
        foreach( var eventData in messages )
        {
            while( !Receiver.fIsDataListLocked )
            {
                Receiver.fIsDataListLocked = true ;
                Receiver.incommingMessagesList.Add( Encoding.UTF8.GetString( eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count ) );
                Receiver.fIsDataListLocked = false ;
            }

        }
        return context.CheckpointAsync();
    }
}

更新:

根据要求提供更多信息:

基本上,我从管道的 2 个不同端提取数据,以验证所有消息是否通过并跟踪它们的吞吐量,一端是 eventthub,另一端来自 lwm2m 服务器作为 HTTP 请求。所以我有一个控制器进程正在运行,它需要从两端获取数据以清理/分析数据。就像我说的,我是事件处理器的新手,但让 EventProcessorHost 处理收集两组数据然后清理/分析它对我来说没有意义。我绝对可以改变以这种方式做事,但它看起来很笨重。

4

1 回答 1

1

在典型场景中,事件处理器以尽可能最快的方式接收和保存数据。多个事件处理器实例将从不同的 EventHub 分区读取数据。

在您的情况下,您希望将数据发送到其他地方,并与该数据的另一个流一起处理它。像 List 这样的内存集合可能不是最好的方法:

  • 它需要是线程安全的
  • 在崩溃数据将丢失
  • 您将需要手动删除已处理的数据以防止收集不断增长

您将需要某种生产者/消费者实现。

一种可能的解决方案是将两个数据流写入单个目标,例如 Azure 存储队列。这样做的主要优点是,当发生故障时,所有数据仍然保持不变并且不会丢失。您的最终处理器可以以自己的速度从队列中读取。

于 2017-06-17T12:46:12.053 回答