1

我正在寻找验证超过 6 亿行和最多 30 列的 csv 文件的每一行(该解决方案必须处理该范围的几个大型 csv 文件)。

列可以是文本、日期或金额。csv 必须使用 40 条规则进行验证,一些规则会检查金额的正确性,其中一些会检查日期(格式)等......</p>

每个验证规则的结果必须保存并在之后显示。

验证数据后,将应用第二阶段的验证规则,此时间基于总和、平均值……每个规则的结果也必须保存。

我正在使用 Spark 加载文件。和

session.read().format("com.databricks.spark.csv").option("delimiter",
         "|").option("header", "false").csv(csvPath)

或者

session.read().option("header", "true").text(csvPath);

要迭代每一行,我看到有两个选项:

  • 使用dataset.map( row -> { something }); “Something”应该验证每一行并将结果保存在某处

但是由于“某事”块将在执行程序中执行,我不知道如何将其返回给驱动程序或将其存储在可以从驱动程序进程中检索到的某个位置。

  • 第二个选项是使用dataset.collect: 但它会导致内存不足,因为所有数据都将加载到驱动程序中。我们可以使用“take”方法,然后从数据集中删除子集(使用过滤器)并重复操作,但我不喜欢这种方法

我想知道是否有人可以建议我一种解决此类问题的可靠方法。基本上保留 Spark 用于验证规则的第二阶段,并使用 Spark 或其他框架来摄取文件并执行并生成第一组验证规则

在此先感谢您的帮助

4

2 回答 2

0

您可以简单地将带有检查结果的列附加到原始数据框中,并使用一堆规则 UDF 来执行实际验证,如下所示:

    object Rules {
      val rule1UDF = udf(
        (col1: String, col2: String) => {
         // your validation code goes here
         true // the result of validation
      }
    }
    // ...
    val nonAggregatedChecksDf = df
       .withColumn("rule1_result", Rules.rule1UDF("col1", "col2"))
       .withColumn("rule2_result", Rules.rule2UDF("col1", "col3"))
       .select("id", "rule1_result", "rule2_result", <all the columns relevant for the aggregation checks>)

    val aggregatedChecksDf = nonAggregatedChecksDf
       .agg(<...>)
       .withColumn("rule3_result", Rules.rule3UDF("sum1", "avg2"))
       .withColumn("rule4_result", Rules.rule4UDF("count1", "count3"))
       .select("id", "rule1_result", "rule2_result", "rule3_result", "rule4_result")

第二个选项是使用 dataset.collect

我建议不要这样做,而是从原始数据框中选择一个关键字段以及所有检查结果列,并将它们以列格式保存为镶木地板。

aggregatedChecksDf
    .select("id", "rule1_result", "rule2_result", "rule3_result", "rule4_result")
    .write
    .mode(saveMode)
    .parquet(path)

这将快得多,因为写入是由所有执行程序并行完成的,并且驱动程序不会成为瓶颈。它也很可能有助于避免 OOM 问题,因为内存使用分布在所有执行程序中。

于 2018-04-23T18:24:20.707 回答
0

您可以使用SparkSession读取 CSV 文件,然后按列对数据进行分区并批量处理数据。例如,您正在将数据写入不需要太多处理的外部数据库。

dataFrame
    .write
    .mode(saveMode)
    .option("batchsize", 100)
    .jdbc(url, "tablename", new java.util.Properties())

如果您的业务逻辑要求您处理数据集/数据框的每一行,您可以使用df.map(). 如果您的逻辑可以同时在多个 RDD 上运行,则可以使用 df.mapPartition(). 每条记录开销较高的任务使用 amapPartition比使用map转换执行得更好。

考虑初始化数据库的情况。如果我们使用map()or foreach(),我们需要初始化的次数将等于 RDD 中元素的数量。而如果我们使用mapPartitions(),我们需要初始化的次数将等于分区数

于 2018-04-23T17:38:23.930 回答