我对使用 EventProcessorHost 和 IEventProcessor 非常陌生,我正在尝试弄清楚如何从 EventProcessorClass 中获取我的数据。如果我只想将新消息记录到控制台,我目前已经完成了所有工作。
我当前的实现(我什至不确定它是否可以接受甚至是好的做法)创建一个静态变量,然后将数据存储在其中,以便另一个处理器可以收集它。这样做可以吗,还是有更好的清洁方法来访问数据?
这是我到目前为止所拥有的(锁定机制非常基本,当我让其余代码正常工作时将被修复):
internal class Receiver
{
public static List<string> incommingMessagesList = new List<string>();
public static bool fIsDataListLocked = false;
private EventProcessorHost m_EPHClient;
...
Console.WriteLine( "Registering EventProcessor..." );
await m_EPHClient.RegisterEventProcessorAsync<SimpleEventProcessor>();
}
public class SimpleEventProcessor : IEventProcessor
{
...
public Task ProcessEventsAsync( PartitionContext context, IEnumerable<EventData> messages )
{
foreach( var eventData in messages )
{
while( !Receiver.fIsDataListLocked )
{
Receiver.fIsDataListLocked = true ;
Receiver.incommingMessagesList.Add( Encoding.UTF8.GetString( eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count ) );
Receiver.fIsDataListLocked = false ;
}
}
return context.CheckpointAsync();
}
}
更新:
根据要求提供更多信息:
基本上,我从管道的 2 个不同端提取数据,以验证所有消息是否通过并跟踪它们的吞吐量,一端是 eventthub,另一端来自 lwm2m 服务器作为 HTTP 请求。所以我有一个控制器进程正在运行,它需要从两端获取数据以清理/分析数据。就像我说的,我是事件处理器的新手,但让 EventProcessorHost 处理收集两组数据然后清理/分析它对我来说没有意义。我绝对可以改变以这种方式做事,但它看起来很笨重。