2

我们有数据驻留在 Azure blob 存储中的表中,该表充当数据湖。每 30 分钟摄取一次数据,因此在 UTC 中形成如下时间分区

<Container>/<TableName>/y=2020/m=02/d=20/h=01/min=00
<Container>/<TableName>/y=2020/m=02/d=20/h=01/min=30
<Container>/<TableName>/y=2020/m=02/d=20/h=02/min=00 and so on. 

用于捕获数据的文件格式是 orc,并且时间分区内的数据分区大小相同。

我们的用例是使用 Spark(V 2.3)在 IST 中捕获日级别的数据进行处理。鉴于数据驻留在 UTC 并且用例是在 IST(+5.30 UTC)中处理数据,从 /h=18/min=30(前一天)到 /h=18/min= 总共 48 个时间分区是必不可少的00(第二天)。我们有两种选择

选项1 为每个时间分区创建数据帧并将其合并

df1 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=20/h=18/min=30)
df2 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=20/h=19/min=00) 
..
df48 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=21/h=18/min=00) ..
df = df.union(df1) 
df = df.union(df2) 
..
df = df.union(df48)

对 48 个分区执行此操作将在 df 中生成一整天的数据。

选项 2 在日级别捕获数据并应用一个小时的过滤条件。

df1 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=20/).filter(h>=19 or (h=18 and min=30))
df2 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=21/).filter(h<=17 or (h=18 and min=00))
df = df1.union(df2)

一旦数据被加载到内存中,处理时间是相同的,即~5 分钟。加载数据所需的时间是瓶颈。选项 1 需要 30 分钟,选项 2 需要 2 分钟才能加载到内存。

在一些博客中,我们看到 Analyzer 每次调用 union 时都会扫描整个先前的数据帧。因此对于 48 个联合,它扫描 1+2+3+47=1128 次。这是指数性能下降的原因吗?Analyzer是做什么的,可以关闭吗?为了使文件存储上的时间分区数据的读取功能通用,是否有任何建议或最佳实践可以采用?

4

2 回答 2

0

等一下……这些文件不是有某种命名约定吗?我的意思是,如果文件的名称基本相同,除了小时和分钟。

像这样: filter(h>=19 or (h=18 and min=30))

只需使用通配符遍历文件并将所有文件合并到一个数据帧中。

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "false")
    .option("sep", "|")
    .load("mnt/<Container>/<TableName>/y=2020/m=02/d=20/h*.gz")
    .withColumn("file_name", input_file_name())

如果架构不在文件本身中,或者由于某种原因它不完整,您可以创建它并覆盖文件中的内容。

val customSchema = StructType(Array(
    StructField("field1", StringType, true),
    StructField("field2", StringType, true),
    StructField("field3", StringType, true),
    etc.

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "false")
    .option("sep", "|")
    .load("mnt/<Container>/<TableName>/y=2020/m=02/d=20/h*.gz")
    .withColumn("file_name", input_file_name())

试试看,看看你们相处得如何。

于 2020-03-04T13:34:38.313 回答
0

数据帧的联合导致分析器遍历所有前面的数据帧。这主要是从 orc 文件中推断模式,如果不匹配则抛出错误。我们观察到的是每个联合期间的大量文件操作。

选项 1 由于每个时间分区中有 > 200 个文件分区,因此分析器的总通过次数为 1+2+..+47=1128。乘以 200 是文件打开分析模式关闭操作的数量 = 225,600。这是选项 1 导致 30 分钟的主要原因。

选项 2 选项 2 执行相同的操作,但在两个大型数据帧上。一个前一天(从 18.30 到 23.30)和另一个第二天(从 00.00 到 18.00)。这导致了 22+26=48x200=9,600 文件打开-分析模式-关闭操作。

为了缓解这种情况,我们指定了模式,而不是依赖于 Spark 的模式推断机制。选项 1 和选项 2 都在指定架构后的 2 分钟内完成。

学习:如果涉及大量数据集的联合/合并,则依赖 spark 的模式推理机制是昂贵的。主要是因为大量的文件操作。如果在之前的操作中已经推断出架构,这可以是 Spark 中的优化以避免再次遍历数据帧。请指定架构以减轻这种情况。

于 2020-02-26T09:35:24.473 回答