4

我有一个带有事件中心触发器的 Azure 函数。此集线器从设备接收消息并将它们存储在 Blob 中。最近,我注意到重复的消息存储在 blob 中。Blob 存储中的文件按上次修改日期排序,如果您查看屏幕截图,您会发现情况并非如此。有没有人见过这个问题?

我还有一个 Azure 函数正在写入 cosmos DB,对于 blob 中的重复消息,cosmos 中没有相应的重复消息。

我还连接了时间序列洞察力,它也没有任何重复的消息。

我打开了事件中心捕获,那里也没有重复的消息。

这是屏幕截图。

在此处输入图像描述

第一列是事件中心排队时间的 unix 时间戳。如果我没有与文件名关联的 guid,它将引发异常。这是一个将数据存储在 blob 中的片段。

dynamic msg = JObject.Parse(myEventHubMessage);
string deviceId = msg.deviceId;
if (deviceId == "5Y.....")
{
           var filename = "_" + ((DateTimeOffset)enqueuedTimeUtc).ToUnixTimeSeconds() + "_" + Guid.NewGuid().ToString() + ".json";
        
           var containerName = "containerName/";
        
           var path = containerName + deviceId + "/" + filename;
        
           using (var writer = binder.Bind<TextWriter>(new BlobAttribute(path)))
           {
                writer.Write(myEventHubMessage);
           }
 }

这里的逻辑非常简单。如果事件到达事件中心,则会触发该函数并将数据存储在 Azure Blob 中。

4

1 回答 1

5

一个重要的提示是事件中心具有至少一次交付保证;强烈建议确保您的处理以适合您的应用程序场景的任何方式对事件重复具有弹性。

关于您在本例中看到的重复项,Azure Functions 的绑定使用EventProcessorHost来读取事件并触发函数代码的执行。由于 Azure 函数会自动向上和向下扩展,因此实例EventProcessorHost将加入和离开负责处理已配置事件中心的使用者组。

当一个处理器启动时,它将尝试平衡处理工作与为同一消费者组活动的其他处理器。如果处理器无法通过声明未拥有的分区来达到其公平份额的工作,它将尝试从其他处理器窃取分区的​​所有权。在此期间,新所有者将从最后记录的检查点开始读取。同时,旧的所有者可能正在将它最后读取的事件分派给处理程序进行处理;在尝试从事件中心服务读取下一组事件之前,它不会理解所有权已更改。当处理器关闭并放弃其分区所有权时,会发生类似的模式。

因此,您将看到在处理器启动或停止时正在处理一些重复的事件,当处理器达到负载平衡的稳定状态时,这些重复事件将消退。该窗口的持续时间应该很短,但会根据处理器的配置和使用的检查点策略而有所不同。

于 2020-08-13T19:58:52.297 回答