4

我在将大量对象从活动函数返回到协调器函数时遇到问题。我有一个下载 180 MB 文件并解析它的函数。该文件将生成一个包含超过 962K 条目的对象列表。每个对象都有大约 70 个属性,但其中只有大约 20% 被填充。当我运行该函数时,代码成功下载并将文件解析到列表中,但是当返回列表时,会引发异常并显示以下信息:

异常:“执行函数时出现异常:#######” - 来源:“System.Private.CoreLib”

内部异常:“函数返回后处理参数 $return 时出错。” - 来源:“Microsoft.Azure.WebJobs.Host”

内部/内部异常:“引发了‘System.OutOfMemoryException’类型的异常。” - 来源:“System.Private.CoreLib”

最后一个嵌套异常将 NewtonsoftJson 包列为进行调用的包,该调用会生成报告的内存不足错误。我在最后包含了这个异常的完整堆栈跟踪。

我知道我可以序列化对象列表并将它们存储在 Azure blob 条目中,然后在需要处理它的下一个函数中再次获取它,但我认为持久函数背后的想法是避免所有这些和保持更精简的工作流程?此外,我的设计基于“Large Message Support #26”github 帖子,该帖子指出,如果大小超过队列消息限制,持久函数扩展将自动将函数有效负载存储在 blob 中(请参阅:https://github. com/Azure/azure-functions-durable-extension/issues/26)。

我需要做些什么才能使它正常工作吗?代码非常简单:

[FunctionName("GetDataFromSource")]
public static IEnumerable<DataDetail> GetDataFromSource([ActivityTrigger]ISource source, ILogger logger)
{
    try
    {
        string importSettings = Environment.GetEnvironmentVariable(source.SettingsKey);
        if (string.IsNullOrWhiteSpace(importSettings))
        {
            logger.LogError($"No settings key information found for the {source.SourceId} data source");                    }
        else
        {
            List<DataDetail> _Data = source.GetVinData().Distinct().ToList();
            return vinData;
         }
     }
     catch (Exception ex)
     {
         logger.LogCritical($"Error processing the {source.SourceId} Vin data source. *** Exception: {ex}");
     }

      return new List<DataDetail>();
}

这是最内部异常的堆栈跟踪:

at System.Text.StringBuilder.ExpandByABlock(Int32 minBlockCharCount)
   at System.Text.StringBuilder.Append(Char value, Int32 repeatCount)
   at System.Text.StringBuilder.Append(Char value)
   at System.IO.StringWriter.Write(Char value)
   at Newtonsoft.Json.JsonTextWriter.WritePropertyName(String name, Boolean escape)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeObject(JsonWriter writer, Object value, JsonObjectContract contract, JsonProperty member, JsonContainerContract collectionContract, JsonProperty containerProperty)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeValue(JsonWriter writer, Object value, JsonContract valueContract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerProperty)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeList(JsonWriter writer, IEnumerable values, JsonArrayContract contract, JsonProperty member, JsonContainerContract collectionContract, JsonProperty containerProperty)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeValue(JsonWriter writer, Object value, JsonContract valueContract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerProperty)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.Serialize(JsonWriter jsonWriter, Object value, Type objectType)
   at Newtonsoft.Json.JsonSerializer.SerializeInternal(JsonWriter jsonWriter, Object value, Type objectType)
   at DurableTask.Core.Serializing.JsonDataConverter.Serialize(Object value, Boolean formatted)
   at Microsoft.Azure.WebJobs.Extensions.DurableTask.MessagePayloadDataConverter.Serialize(Object value, Int32 maxSizeInKB) in C:\projects\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\MessagePayloadDataConverter.cs:line 55
   at Microsoft.Azure.WebJobs.Extensions.DurableTask.MessagePayloadDataConverter.Serialize(Object value) in C:\projects\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\MessagePayloadDataConverter.cs:line 43
   at Microsoft.Azure.WebJobs.DurableActivityContext.SetOutput(Object output) in C:\projects\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\DurableActivityContext.cs:line 136
   at Microsoft.Azure.WebJobs.Extensions.DurableTask.ActivityTriggerAttributeBindingProvider.ActivityTriggerBinding.ActivityTriggerReturnValueBinder.SetValueAsync(Object value, CancellationToken cancellationToken) in C:\projects\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\Bindings\ActivityTriggerAttributeBindingProvider.cs:line 213
   at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.ParameterHelper.ProcessOutputParameters(CancellationToken cancellationToken) in C:\projects\azure-webjobs-sdk-rqm4t\src\Microsoft.Azure.WebJobs.Host\Executors\FunctionExecutor.cs:line 972
4

2 回答 2

1

我在使用 Durable Functions 时遇到了类似的问题。

有几个解决方案/解决方法:正如您所说,您可以将函数有效负载存储在 blob 存储中,并在需要时检索它们。这可行,但会影响性能,并且可能需要一段时间才能检索,具体取决于文件的大小。

另一种选择是批量调用。我不完全确定你的GetVinData()方法是做什么的,但你可以修改它,这样你一次只能检索 50,000 个(或x个)项目。您的编排器可以多次调用您的活动函数并在编排器中建立您的列表。

[FunctionName(nameof(OrchestratorAsync))]
public async Task OrchestratorAsync([OrchestrationTrigger] IDurableOrchestrationContext context) 
{
    var dataDetailList = new List<DataDetail>();
    var batches = BuildBatchesHere();

    foreach (var batch in batches) 
    {
        dataDetailList.AddRange(
             await context.CallActivityAsync<List<DataDetail>>(
                  nameof(GetDataFromSource), batch);
    }
    // Do whatever you need with dataDetailList
}
于 2021-10-06T10:18:44.723 回答
0

当大型消息不适合队列和表时,Durable Functions 扩展将自动负责将大型消息存储在 Blob 中。但是,此支持假定有足够的内存可用于序列化有效负载,以便可以将它们上传到 blob。不幸的是,Durable Task Framework 的设计要求在上传到 blob 之前先将 payload 序列化为字符串,这意味着会有很大的内存压力。

您可以尝试一些方法来缓解此问题:

  1. 确保您的函数应用在 64 位模式下运行。默认情况下,函数应用程序在 32 位模式下创建,该模式具有较低的内存限制。我们已经看到了几个简单地切换到 64 位解决内存不足问题的案例。

  2. 尝试增加特定计划的内存限制。如果你在 Azure Functions 消耗计划中运行,则最大内存是固定的。但是,如果您在 Elastic Premium 或 App Service 计划中运行,则可以选择使用具有更多内存的更大 VM。

  3. 正如@stephyness 建议的那样,考虑限制从函数返回的数据量。这可能会返回完整列表的子集,也可能是完整列表但有效负载较小(例如,source.GetVinData().Distinct().Select(x => x.VinNumber)(您甚至可以通过简单地删除来获得更好的结果.ToList(),这可能会创建不必要的数据副本)。本质上,只返回编排器绝对需要取得进展的数据。返回编排器不需要的数据是不必要的开销。

另请注意,使用大型消息支持时会产生不小的性能影响。如果您可以避免依赖它,您的编排将运行得更快。

控制内存使用的其他技巧可以在性能和规模文档中找到。

于 2022-02-11T17:57:15.290 回答