1

我们正在使用 IoT 中心来摄取设备数据。我们目前正在使用 10 个分区来处理消息。因此,运行 5 个工作人员将使每个实例处理 2 个分区。

我们发现,如果一个设备连接并卸载了 500 条消息到集线器,所有这些消息只会从一个上下文分区中流出,即使其他消息没有做任何工作。

设计一个利用路由/端点来实现这一目标的系统是唯一的选择吗?

我们在低级设备上使用 MQTT,因此短期内无法更改设备固件,但可以长期更改。

老实说,我认为它以最少的方式发送消息,即使是循环赛也会更好。我们很可能需要将消息输入队列并从那里处理它们以实现更好的规模。目前正在创建多个线程来处理IEnumerable<EventData> messages每个分区及其 MUUUCH 更好的谢天谢地。然而,瓶颈仍然会以某种形式存在,直到我们开始实施进一步的队列并在那里扩展。

更新 - 添加一些示例代码,显示我在做什么 只是提醒一下,我们现在通过多个任务处理每批消息,性能提高了 10 倍。我将在我们的下一个版本中重构代码,但现在这工作得很好。

    async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
       await ProcessEventsBulk(context, messages);
    }

    async Task ProcessEventsBulk(PartitionContext context, IEnumerable<EventData> messages)
    {
        List<Task> TaskList = new List<Task>();
        foreach (EventData message in messages)
        {
            var LastTask = Task.Run(() => GoBoy(context, message));
            TaskList.Add(LastTask);
        }
        await Task.WhenAll(TaskList);
    }

    async Task GoBoy(PartitionContext context, EventData message)
    {
        using (var db = new AppDbContext(_dbContextConnectionString))
        {
            await ProcessEvent(message, context.Lease.PartitionId, new CoreManagerContainer(db), db);
            await db.SaveChangesAsync();
        }
    }

Process Event 将执行以下操作:

  • 解码包
  • 通过 NLOG 登录到 Azure 表存储
  • 更新 SQL 中的值(尤其是设备表)
  • 将消息插入 DocumentDB
  • 将消息转发到 2 个队列
  • 回复单位

我知道我们可以将这些分成不同的工作人员,但我喜欢它的“实时”和更安全,因为在我们确认之前,该单位不会清除信息。

我用一些早期代码创建了一个分支,在 ACKing 之前我在其中批量插入到 DocDB。没有看到重大改进,但应该有助于我假设的 RU。

4

0 回答 0