1

我有一个 IoT 设备 (ESP32),它每 x 秒向 IoT Hub(免费层)发送数据包。每个数据包都包含一个整数属性 PacketID,每次发送新数据包时都会递增,从 1 开始,依此类推。当 IoT 中心接收到数据时,将调用 Azure IoT 中心触发器(托管在门户上)并进一步处理数据 - 首先插入到 CosmosDB(来自 StorageQueue 触发器),然后作为消息发送到 SignalR 服务(来自另一个 StorageQueue 触发器) 和客户端 Web 应用程序。

IoT 中心触发器如下所示:

    [FunctionName("FramesPacketAdd_IoTHubTrigger")]
    public async Task Run(
    [IoTHubTrigger("messages/events", Connection = "IoTHubConnectionString")] EventData message, ILogger _logger,
    [Queue("dbinsert-frames-packet-queue", Connection = "AzureStorageAccountConnectionString")] IAsyncCollector<FramesPacket> dbInsertFramesPacketQueue,
    [Queue("eventdata-parse-error-queue", Connection = "AzureStorageAccountConnectionString")] IAsyncCollector<string> eventDataParseErrorQueue)
    {        
        try
        {
            _logger.LogInformation($"C# IoT Hub trigger function FramesPacketAdd processed a message: {Encoding.UTF8.GetString(message.Body.Array)}");

            string messageBody = Encoding.UTF8.GetString(message.Body.Array);
            var jsonObj = JsonConvert.DeserializeObject<FramesPacket>(messageBody);

            var framesPacket = new FramesPacket
            {
                PacketID = jsonObj.PacketID,
                SessionID = jsonObj.SessionID,                    
                DeviceID = jsonObj.DeviceID,
                Frames = jsonObj.Frames
            };

            await dbInsertFramesPacketQueue.AddAsync(framesPacket);
        }

        catch (Exception ex)
        {
            await eventDataParseErrorQueue.AddAsync($"[IoTHubTrigger] function [FramesPacketAdd] caught exception: {ex.Message} \nStackTrace: {ex.StackTrace} \nJSON: {Encoding.UTF8.GetString(message.Body.Array)}");

            _logger.LogError($"[IoTHubTrigger] function [FramesPacketAdd_IoTHubTrigger] caught exception: {ex.Message} \nStackTrace: {ex.StackTrace} \nJSON: {Encoding.UTF8.GetString(message.Body.Array)}");
        }
    }

问题是,当数据包快速发送时,比如每个数据包间隔 200 毫秒,IoT 中心接收到的数据是乱序的,这意味着有时数据包 #7 和 #8 在数据包 #1、#2 和 # 之前被接收和处理3,这对我的客户端应用程序来说是一个问题,因为它依赖于以与从 ESP32 芯片发送的相同顺序接收数据包。我尝试以 1 秒的延迟发送每个数据包,但问题仍然存在,但它似乎只影响前三个数据包 - 有时它们按 3、1、2 的顺序接收,其余的顺序正确。更长的延迟似乎完全消除了这个问题。我确实相信这是因为函数/触发器的异步性质?最后,我希望能够以相对较快的速度发送它们,每个之间有 200-500 毫秒的延迟。

我对 Azure IoT Hub 很陌生,我的问题是,是否可以在门户端做一些事情来确保以正确的顺序接收数据包,或者这是这种方法的限制,这种情况需要在收到数据后处理,可能在调用 IoT 中心触发器后使用存储队列或服务总线?

我希望我的问题是有道理的,否则我很乐意提供更多细节。

提前致谢。

4

2 回答 2

1

以下是两种可能的路径:

1. 使用 EventHub 触发的 Azure Function 处理来自 IoT Hub 的 Event Hub 兼容端点的消息。

可以保证来自特定设备的每条消息都将发送到同一个分区。

分区中的消息是有序的,因此您需要的是每个分区的事件处理器。

您可以为此使用 Azure 函数,在这种情况下,Azure 函数所做的是为每个分区创建一个租约,每个分区只生成一个 Azure 函数实例。这意味着,如果您有 10 个分区,则将有 10 个 Azure 函数实例处理来自每个分区的消息。现在您要做的是确保在 Azure 函数中按顺序处理消息批处理,例如:

[FunctionName("EventHubTrigger")]
public static async Task RunAsync([EventHubTrigger("ordered", Connection = "EventHub")] EventData[] eventDataSet, TraceWriter log)
{
    log.Info($"Triggered batch of size {eventDataSet.Length}");
    
    //processing event by event
    foreach (var eventData in eventDataSet) {
        try
        {
            // process the event here
        }
        catch
        {
            // handle event exception
        }
    }
}

在这里,在 foreach 循环之前,除了本机行为之外,您还可以eventDataSet根据sequenceNumber情况进行排序(如果您正在发送它)。

这是博客

2. 第二种方式很相似,都是使用Service Bus队列和会话,如下代码示例:

public async Task Run(
    [ServiceBusTrigger("queue", Connection = "ServiceBusConnectionString", IsSessionsEnabled = true)]Message message, 
    ILogger log)
{
    log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.Body)}");
    // process the message here
}

这里也是博客

需要注意的重要一点是,如果任何 Azure 函数实例(在第一种情况下)失败,则租约将被续订,并且它可能会处理某些消息两次。如果这是一个问题,那么您应该选择第二种情况。

于 2020-11-19T20:10:10.137 回答
0

IoT Hub 按 deviceId 对数据进行分区(因此来自单个设备的所有消息都会按顺序写入相同的底层事件中心分区)..因此假设您在 ESP32 应用程序中仅使用一个 deviceid,它们应该按照以下顺序排列物联网中心收到了它们。

您是否将消息转储到 azure 函数日志以查看它们是否以正确的顺序读取(即,数据库插入可能让它们出现故障)

否则,我不确定 azure 函数侦听器(应该在幕后使用 eventprocessorhost)的管道是否乱序读取它们

于 2020-11-19T19:57:35.637 回答