我们有数据驻留在 Azure blob 存储中的表中,该表充当数据湖。每 30 分钟摄取一次数据,因此在 UTC 中形成如下时间分区
<Container>/<TableName>/y=2020/m=02/d=20/h=01/min=00
<Container>/<TableName>/y=2020/m=02/d=20/h=01/min=30
<Container>/<TableName>/y=2020/m=02/d=20/h=02/min=00 and so on.
用于捕获数据的文件格式是 orc,并且时间分区内的数据分区大小相同。
我们的用例是使用 Spark(V 2.3)在 IST 中捕获日级别的数据进行处理。鉴于数据驻留在 UTC 并且用例是在 IST(+5.30 UTC)中处理数据,从 /h=18/min=30(前一天)到 /h=18/min= 总共 48 个时间分区是必不可少的00(第二天)。我们有两种选择
选项1 为每个时间分区创建数据帧并将其合并
df1 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=20/h=18/min=30)
df2 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=20/h=19/min=00)
..
df48 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=21/h=18/min=00) ..
df = df.union(df1)
df = df.union(df2)
..
df = df.union(df48)
对 48 个分区执行此操作将在 df 中生成一整天的数据。
选项 2 在日级别捕获数据并应用一个小时的过滤条件。
df1 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=20/).filter(h>=19 or (h=18 and min=30))
df2 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=21/).filter(h<=17 or (h=18 and min=00))
df = df1.union(df2)
一旦数据被加载到内存中,处理时间是相同的,即~5 分钟。加载数据所需的时间是瓶颈。选项 1 需要 30 分钟,选项 2 需要 2 分钟才能加载到内存。
在一些博客中,我们看到 Analyzer 每次调用 union 时都会扫描整个先前的数据帧。因此对于 48 个联合,它扫描 1+2+3+47=1128 次。这是指数性能下降的原因吗?Analyzer是做什么的,可以关闭吗?为了使文件存储上的时间分区数据的读取功能通用,是否有任何建议或最佳实践可以采用?