问题标签 [event-processor-host]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
3760 浏览

c# - 如何将参数传递给 IEventProcessor 的实现

我正忙于EventProcessorHost为 azure EventBus 客户端实现客户端。

我有一个实现IEventProcessor如下的类:

然后我将其称为如下:

我需要将参数传递给创建的MyEventProcessor实例EventProcessorHost。我该怎么做呢?

0 投票
1 回答
263 浏览

azure - 辅助角色 - EventProcessorHost CheckPoint 似乎导致 409 冲突 Azure 存储

我有一个云服务和一个带有 2 个分区的事件中心。我的云服务是写入 Azure 存储的辅助角色。它写入从事件中心接收的所有消息。

使用 Azure 模拟器,我的工作角色工作正常,它写入生产环境中的 Azure 存储(不是开发,所以它是相同的天蓝色存储)。

当我将工作人员角色推送到云服务时,我从 Application Insight 收到此类错误(100% 成功调用:false)。使用 IntelliTrace 日志,我得到了 409 冲突。

我尝试进行远程调试,但它太慢了,以至于与等待“下一步”相比,我需要更少的时间来重写我的代码......

我删除了代码中的所有租赁管理,但没有任何改变......

我坚信这与检查点问题有关。

_host = new EventProcessorHost(Environment.MachineName, eventHubName, consumerGroupName, eventHubConnectionString, checkpointConnectionString);

我在这个方法中使用我的检查点(在我的 HandledEventProcessor 中)

Application Insight 日志

23/4/2016 10:35:01 - 依赖 Azure blob:myblobStorage/myContainer 依赖持续时间:2.84 毫秒成功调用:错误 URL:https://****.blob.core.windows.net:443/myContainer/ myConsumerGroupName/partition1

23/4/2016 10:35:01 - 依赖 Azure blob:myblobStorage/myContainer 依赖持续时间:2.84 毫秒成功调用:错误 URL:https://****.blob.core.windows.net:443/myContainer/ myConsumerGroupName/partition0

23/4/2016 10:34:59 - 依赖 Azure blob:myblobStorage/myContainer 依赖持续时间:4.4 毫秒成功调用:falseURL:https://****.blob.core.windows.net:443/myContainer/myConsumerGroupName/ 1?comp=lease&timeout=10

23/4/2016 10:34:59 - 依赖 Azure blob:myblobStorage/myContainer 依赖持续时间:4.4 毫秒成功调用:falseURL:https://****.blob.core.windows.net:443/myContainer/myConsumerGroupName/ 0?comp=lease&timeout=10

我没抓到这个...如果有人有想法,非常欢迎...

任何帮助,将不胜感激。

0 投票
2 回答
206 浏览

signalr - Azure 云服务工作者角色是运行 EventHub EventProcessor 的唯一 Azure 托管选项吗?

我目前正在通过事件中心和 EventProcessorHost 进行斗争。到目前为止,我发现的所有指南都建议在 Azure 云服务辅助角色中运行 EventProcessor。由于这些部署和更新非常缓慢,我想知道是否有任何 Azure 服务可以让我在更敏捷的环境中运行 EventProcessor?

到目前为止,我的粗略架构看起来像这样

设备 > IoT 中心 > 流分析作业 > 事件中心 > [MyEventProcessor] > SignalR > 客户端...

或者也许还有另一种从 Steam Analytics 获取触发 SignalR 消息的方法?

任何建议都受到高度赞赏。

谢谢,菲利普

0 投票
1 回答
297 浏览

azure - EventProcessorHost 在具有多个实例的 webjob 中使用 - 给出异常 Microsoft.ServiceBus.Messaging.LeaseLostException

我在具有多个实例的 webjob 中使用 EventProcessorHost - 给出异常 Microsoft.ServiceBus.Messaging.LeaseLostException。特别是只有一个实例给出了这个例外。

当我将它作为单个实例运行时,它没有给出任何异常

Microsoft.ServiceBus.Messaging.LeaseLostException:引发了“Microsoft.ServiceBus.Messaging.LeaseLostException”类型的异常。---> Microsoft.WindowsAzure.Storage.StorageException:远程服务器返回错误:(409)冲突。---> System.Net.WebException:远程服务器返回错误:(409)冲突。在 Microsoft.WindowsAzure.Storage.Shared.Protocol.HttpResponseParsers.ProcessExpectedStatusCodeNoException[T](HttpStatusCode expectedStatusCode、HttpStatusCode actualStatusCode、T retVal、StorageCommandBase1 cmd, Exception ex) in c:\Program Files (x86)\Jenkins\workspace\release_dotnet_master\Lib\Common\Shared\Protocol\HttpResponseParsers.Common.cs:line 50 at Microsoft.WindowsAzure.Storage.Blob.CloudBlob.<>c__DisplayClass33.<RenewLeaseImpl>b__32(RESTCommand1 cmd, HttpWebResponse resp, Exception ex, OperationContext ctx) 在 c:\Program Files (x86)\Jenkins\workspace\release_dotnet_master\Lib\ClassLibraryCommon\Blob\CloudBlob.cs:line 3186 at Microsoft.WindowsAzure.Storage.Core.Executor .Executor.EndGetResponse[T](IAsyncResult getResponseResult) 在 c:\Program Files (x86)\Jenkins\workspace\release_dotnet_master\Lib\ClassLibraryCommon\Core\Executor\Executor.cs:line 299 --- 内部异常堆栈跟踪结束--- 在 c:\Program Files (x86)\Jenkins\workspace\release_dotnet_master\Lib\ClassLibraryCommon\Core\Executor\Executor 中的 Microsoft.WindowsAzure.Storage.Core.Executor.Executor.EndExecuteAsync[T](IAsyncResult 结果)。 c: 第 50 行 Microsoft.WindowsAzure.Storage.Blob.CloudBlob.EndRenewLease(IAsyncResult asyncResult) 在 c:\Program Files (x86)\Jenkins\workspace\release_dotnet_master\Lib\ClassLibraryCommon\Blob\CloudBlob.cs:Microsoft.WindowsAzure.Storage.Core.Util.AsyncExtensions 的第 1982 行。<>c__DisplayClass4.b__3(IAsyncResult ar) in c: \Program Files (x86)\Jenkins\workspace\release_dotnet_master\Lib\ClassLibraryCommon\Core\Util\AsyncExtensions.cs:line 114 --- 在 System.Runtime.CompilerServices 处从先前引发异常的位置结束堆栈跟踪.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Microsoft.ServiceBus.Messaging.BlobLeaseManager.d__23.MoveNext() --- End of internal exception stack trace --- at Microsoft .ServiceBus.Messaging.BlobLeaseManager.d__23。MoveNext() --- 从先前抛出异常的位置结束堆栈跟踪 --- 在 System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 在 Microsoft .ServiceBus.Messaging.BlobLeaseManager.d__24.MoveNext() --- 在 System.Runtime.CompilerServices 的 System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) 处从先前引发异常的位置结束堆栈跟踪。 TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at RoutingServiceWebJob.DataProcessorFactory.EventHubDataProcessor.d__37.MoveNext() in d:\a\1\s\RoutingServiceWebJob\DataProcessorFactory\EventHubDataProcessor.cs:line 163ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Microsoft.ServiceBus.Messaging.BlobLeaseManager.d__24.MoveNext() --- 从之前抛出异常的位置结束堆栈跟踪---在 System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) 在 System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 在 RoutingServiceWebJob.DataProcessorFactory.EventHubDataProcessor.d__37.MoveNext() 在 d:\a\1\s\ RoutingServiceWebJob\DataProcessorFactory\EventHubDataProcessor.cs:第 163 行ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Microsoft.ServiceBus.Messaging.BlobLeaseManager.d__24.MoveNext() --- 从之前抛出异常的位置结束堆栈跟踪---在 System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) 在 System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 在 RoutingServiceWebJob.DataProcessorFactory.EventHubDataProcessor.d__37.MoveNext() 在 d:\a\1\s\ RoutingServiceWebJob\DataProcessorFactory\EventHubDataProcessor.cs:第 163 行MoveNext() --- 从先前引发异常的位置结束堆栈跟踪 --- 在 System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 在 RoutingServiceWebJob 的 System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) .DataProcessorFactory.EventHubDataProcessor.d__37.MoveNext() 在 d:\a\1\s\RoutingServiceWebJob\DataProcessorFactory\EventHubDataProcessor.cs:line 163MoveNext() --- 从先前引发异常的位置结束堆栈跟踪 --- 在 System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 在 RoutingServiceWebJob 的 System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) .DataProcessorFactory.EventHubDataProcessor.d__37.MoveNext() 在 d:\a\1\s\RoutingServiceWebJob\DataProcessorFactory\EventHubDataProcessor.cs:line 163

我一次读一条消息。请建议。

0 投票
2 回答
523 浏览

azure - 从 eventthub EventProcessorHost 与 EventHubReceiver 读取事件

我尝试通过带有 EventHubReceiver 的控制台应用程序 1 和带有 EventProcessorHost 的另一个应用程序从 eventthub 读取事件,我注意到 EventProcessorHost 的读取速度比 EventHubReceiver 快。这是真的吗?

0 投票
1 回答
569 浏览

c# - 从 EventProcessorHost 中获取数据

我对使用 EventProcessorHost 和 IEventProcessor 非常陌生,我正在尝试弄清楚如何从 EventProcessorClass 中获取我的数据。如果我只想将新消息记录到控制台,我目前已经完成了所有工作。

我当前的实现(我什至不确定它是否可以接受甚至是好的做法)创建一个静态变量,然后将数据存储在其中,以便另一个处理器可以收集它。这样做可以吗,还是有更好的清洁方法来访问数据?

这是我到目前为止所拥有的(锁定机制非常基本,当我让其余代码正常工作时将被修复):

更新:

根据要求提供更多信息:

基本上,我从管道的 2 个不同端提取数据,以验证所有消息是否通过并跟踪它们的吞吐量,一端是 eventthub,另一端来自 lwm2m 服务器作为 HTTP 请求。所以我有一个控制器进程正在运行,它需要从两端获取数据以清理/分析数据。就像我说的,我是事件处理器的新手,但让 EventProcessorHost 处理收集两组数据然后清理/分析它对我来说没有意义。我绝对可以改变以这种方式做事,但它看起来很笨重。

0 投票
0 回答
412 浏览

c# - 事件中心消息分发

我们正在使用 IoT 中心来摄取设备数据。我们目前正在使用 10 个分区来处理消息。因此,运行 5 个工作人员将使每个实例处理 2 个分区。

我们发现,如果一个设备连接并卸载了 500 条消息到集线器,所有这些消息只会从一个上下文分区中流出,即使其他消息没有做任何工作。

设计一个利用路由/端点来实现这一目标的系统是唯一的选择吗?

我们在低级设备上使用 MQTT,因此短期内无法更改设备固件,但可以长期更改。

老实说,我认为它以最少的方式发送消息,即使是循环赛也会更好。我们很可能需要将消息输入队列并从那里处理它们以实现更好的规模。目前正在创建多个线程来处理IEnumerable<EventData> messages每个分区及其 MUUUCH 更好的谢天谢地。然而,瓶颈仍然会以某种形式存在,直到我们开始实施进一步的队列并在那里扩展。

更新 - 添加一些示例代码,显示我在做什么 只是提醒一下,我们现在通过多个任务处理每批消息,性能提高了 10 倍。我将在我们的下一个版本中重构代码,但现在这工作得很好。

Process Event 将执行以下操作:

  • 解码包
  • 通过 NLOG 登录到 Azure 表存储
  • 更新 SQL 中的值(尤其是设备表)
  • 将消息插入 DocumentDB
  • 将消息转发到 2 个队列
  • 回复单位

我知道我们可以将这些分成不同的工作人员,但我喜欢它的“实时”和更安全,因为在我们确认之前,该单位不会清除信息。

我用一些早期代码创建了一个分支,在 ACKing 之前我在其中批量插入到 DocDB。没有看到重大改进,但应该有助于我假设的 RU。

0 投票
1 回答
1876 浏览

java - Azure 事件中心偏移

我已经实现了以下链接中的代码,用于从事件中心接收事件。但是假设有 10 个事件,并且每 5 个事件完成一次检查点。现在程序在读取第 7 个事件时异常退出,如果我再次重新启动事件处理器主机,则重新读取事件 (1,2,3,4,6)。请建议我将如何再次避免从第 7 次活动中重新阅读和阅读?任何示例将不胜感激。谢谢。

https://github.com/Azure/azure-event-hubs/blob/master/samples/Java/src/main/java/com/microsoft/azure/eventhubs/samples/Basic/EventProcessorSample.java

0 投票
1 回答
228 浏览

azure - 如何将 Azure ServiceBus EventProcessorHost 库与短期 SAS 令牌一起使用?

我调用了一个 REST 服务,它会分发短期(大约 20 分钟)的 SAS 令牌,以便从 Azure 事件中心读取数据。我想避免一个解决方案,我必须在令牌过期时每 20 分钟拆除所有侦听器并重新创建它们。库是否支持接口或回调方法,我的代码可以在库需要或令牌过期时提供令牌?

0 投票
0 回答
236 浏览

azure - IOT Hub 的 EventProcessorHost 单个实例中的 LeaseLostException

我创建了一个简单的事件处理器主机,它从配置了租赁管理默认选项的 Azure IOT 集线器读取事件。
但是,即使我正在运行 EventProcessorHost 的单个实例,我也会定期出现以下异常:
ReceiverDisconnectedException:创建了具有更高纪元的新接收器,因此当前具有纪元的接收器正在断开连接。原因:LeaseLost
客户端重新获取租约并进一步处理消息。

这里是 EventProcessorHost 的初始化

这种行为是正常的还是我需要为单个实例做一些事情来避免它?

下面是我的事件处理器: