1


我尝试使用“EventHubReceiver”(Device2Cloud)从设备接收消息。每个设备都应该有自己的单个接收器。

为所有设备创建一个 EventHubReceiver(每个分区)不是问题:

string iotHubconnectionString = CloudConfigurationManager.GetSetting("Microsoft.IotHub.ConnectionStringPayed");
string iotHubD2cEndpoint = "messages/events";
EventHubClient eventHubClient = EventHubClient.CreateFromConnectionString(iotHubconnectionString, iotHubD2cEndpoint);
EventHubRuntimeInformation runtimeInformation = eventHubClient.GetRuntimeInformation();

然后,如果我想从客户端接收消息,请执行以下步骤:

EventHubReceiver eventHubReceiver = eventHubClient2.GetDefaultConsumerGroup().CreateReceiver(partition);  //Get the partitions via the runtimeInformation: string[] partitions = runtimeInformation.PartitionIds;
var incommingMessage = eventHubReceiver.ReceiveAsync();    //Wait here for incomming messages

这很好,但是来自所有“设备”的所有消息都到达了这个“EventHubReceiver”。我想要多个接收器,只接收来自单个设备的消息。
我尝试更改以下代码行:

string iotHubD2cEndpoint = "messages/events";

string iotHubD2cEndpoint = "devices/{deviceID}/messages/events";

但这行不通。我收到以下错误:

The link address 'devices/secondDevice/messages/events/$management' did not match any of the expected formats. Supported formats: '/$cbs', '/devices/{deviceid}/messages/events', '/devices/{deviceid}/messages/deviceBound', '/messages/deviceBound', '/messages/serviceBound/feedback', '/messages/events/*'.

所以问题是我不知道是不是不可能为每个设备创建一个'devices/secondDevice/messages/events/$management'
EventHubReceiver'devices/secondDevice/messages/events/'
或者我的代码或思维有错误。

4

2 回答 2

2

当设备将遥测数据发送到 IoT 中心时,这些事件可供云中“事件中心兼容”的相关 D2C 终结点使用。它具有“类似事件中心”的行为,因此我们能够使用 EventHubReceiver 获取消息。但是,事件中心在分区中工作,传入消息可以以循环方式或散列分区键分配给分区。在 IoT 中心架构中,我不知道它是否使用循环法,但它可以用来散列设备 ID(作为分区键),因此来自设备的所有消息都进入一个分区。这并不意味着该分区仅包含该设备的消息!不可能为每个设备都有一个分区:-) 所以...... 一个分区包含来自不同设备的混合消息(但来自特定设备的消息总是在同一个分区中)。事件中心接收器可以从分区读取,因此它可以从更多设备获取所有消息。您需要根据设备 ID 区分它们。

于 2015-11-17T10:47:32.810 回答
0

我忘了说,我已经有了解决方案。但这不是很好:

  • 所有 deviceThreads(每个分区和设备一个)在某一点等待(“WaitOne”)(使用 AutoResetEvent 锁定)
  • 我用一个接收器接收所有消息
  • 我将消息放入每个设备的自己的队列中(字典包含作为键的设备对象(id 和分区)和作为值的列表
  • 我用“set”命令设置了一个线程。
  • 线程在其队列中查找并:
    • 如果队列中有消息,它会继续并以“yield return”返回结果 -> 再次等待新消息
    • 否则它会设置不同的线程费用并返回冻结状态

所有线程都在 while(true) 中循环以等待消息。该解决方案正在运行,但它看起来不是很高效(有很多线程)并且有点复杂。

于 2015-11-17T11:16:12.093 回答