0

我正在处理 Azure Synapse Spark 笔记本中大约 19,710 个包含 IIS 日志文件的目录。每个目录中有 3 个 IIS 日志文件。笔记本读取目录中的 3 个文件,并将它们从分隔的文本转换为 Parquet。没有分区。但偶尔我会无缘无故地收到以下两个错误。

在此处输入图像描述

{
    "errorCode": "2011",
    "message": "An error occurred while sending the request.",
    "failureType": "UserError",
    "target": "Call Convert IIS To Raw Data Parquet",
    "details": []
}

当我收到上述错误时,所有数据都已成功写入 Azure Data Lake Storage Gen2 中的相应文件夹。

有时我得到 在此处输入图像描述

{
    "errorCode": "6002",
    "message": "(3,17): error CS0234: The type or namespace name 'Spark' does not exist in the namespace 'Microsoft' (are you missing an assembly reference?)\n(4,17): error CS0234: The type or namespace name 'Spark' does not exist in the namespace 'Microsoft' (are you missing an assembly reference?)\n(12,13): error CS0103: The name 'spark' does not exist in the current context",
    "failureType": "UserError",
    "target": "Call Convert IIS To Raw Data Parquet",
    "details": []
}

当我收到上述错误时,没有任何数据被成功写入 Azure Data Lake Storage Gen2 中的相应文件夹。

在这两种情况下,您都可以看到笔记本确实运行了一段时间。我在 spark 笔记本上启用了 1 次重试,它是一个 pyspark 笔记本,它使用 C# %%csharp 为参数和其余逻辑执行 python。火花池很小(4 核/32GB),有 5 个节点。

笔记本中进行的唯一转换是将字符串列转换为时间戳。

var dfConverted = dfparquetTemp.WithColumn("Timestamp",Col("Timestamp").Cast("timestamp"));

当我说这是随机的时,管道当前正在运行,并且在处理 215 个目录后,有 2 个第一个失败和一个第二个失败。

任何想法或建议将不胜感激。

4

1 回答 1

0

好吧,我认为这是问题的一部分。请记住,我正在用 C# 编写逻辑的主要部分,因此您使用另一种语言的情况可能会有所不同。这些也是以空格分隔的 IIS 日志文件,它们的大小可以是数兆字节,例如一个文件可以是 30MB。

我的新代码已经运行了 17 个小时,没有出现任何错误。我所做的所有更改都是为了确保我处理了会消耗内存的资源。示例如下:

将文本分隔文件作为二进制文件读取时

    var df = spark.Read().Format("binaryFile").Option("inferSchema", false).Load(sourceFile) ;            
    byte[] rawData = df.First().GetAs<byte[]>("content");

byte[] 中的数据最终被加载到 aList<GenericRow>但我从未将变量 rawData 设置为 null。

从上面的数据框中填充字节 [] 后,我添加了

    df.Unpersist() ;

在将所有数据List<GenericRow> rows从 byte[] 完全放入并使用下面的代码将其添加到数据框中后,我清除了 rows 变量。

    var dfparquetTemp = spark.CreateDataFrame(rows,inputSchema);
    rows.Clear() ;

最后,在更改列类型并写出数据后,我对数据框进行了非持久化。

    var dfConverted = dfparquetTemp.WithColumn("Timestamp",Col("Timestamp").Cast("timestamp"));
    if(overwrite) {
        dfConverted.Write().Mode(SaveMode.Overwrite).Parquet(targetFile) ;
    }
    else {
        dfConverted.Write().Mode(SaveMode.Append).Parquet(targetFile) ;
    }
    dfConverted.Unpersist() ; 

最后,我的大部分逻辑都包含在一个 C# 方法中,该方法在 foreach 循环中被调用,希望 CLR 能够处理我错过的任何其他内容。

最后但并非最不重要的一个教训。

  • 在读取包含多个 parquet 文件的目录时,spark 似乎会将所有文件读入数据框中。
  • 当读取包含多个文本分隔文件的目录时,您将其视为二进制文件,spark 仅将其中一个文件读取到数据框中。

因此,为了处理文件夹中的多个文本分隔文件,我必须传入多个文件的名称,并使用 SaveMode.Overwrite 处理第一个文件,并将其他文件作为 SaveMode.Append 处理。尝试使用任何类型的通配符并指定目录名称的每种方法都只会导致将一个文件读入数据框中。(相信我,经过几个小时的 GoogleFu 我尝试了我能找到的所有方法。)

再次处理 17 小时后没有一个错误,因此重要的一课似乎是尽可能降低内存使用率。

于 2022-03-03T15:39:55.713 回答