我们正在将 Scala Spark 程序从 1.6.3 迁移到 2.2.0。有问题的程序有四个部分:我们称它们为 A、B、C 和 D 部分。A 部分解析输入(parquet 文件),然后缓存 DF 并创建一个表。然后B、C、D段依次对DF进行不同类型的处理。
这个 spark 作业每小时运行大约 50 次,输入文件的数量取决于当前可用的文件。executors、cores、executor memory 和 agg partitions 的数量是根据输入文件的数量和它们的大小来计算的。这些是这样计算的:
- n_execs = min(n_infiles, num_datanodes)
- n_cores = ceil(n_infiles/n_execs)
- exec_mem = 1500 * n_cores (MB)
- shuffle_partitions = n_infiles
- driver_mem = ceil(n_infiles / 200) (GB)
- 最大分区字节数 = 5*1024*1024(常数)
在我们的单元测试环境中,我们目前有 12 个数据节点,平均文件输入大小为 11MB / 630K 记录。所以一些例子是:
- n_infiles (24): driver_mem (1G), n_execs (12), n_cores (2), exec_mem (3000M), shuffle_partitions (24)
- n_infiles (4): driver_mem (1G), n_execs: (4), n_cores (1), exec_mem (1500M), shuffle_partitions (4)
现在的问题是:使用 Spark 2.2 分析 1095 次运行,我们看到 C 部分平均耗时 41.2 秒,而 D 部分平均耗时 47.73 秒。在 Spark 1.6.3 中,超过 425 次运行的 C 部分平均耗时 23 秒,而 D 部分平均耗时 64.12 秒。A 部分和 B 部分在两个版本之间具有几乎相同的平均运行时间。D 部分的改进很大,但 C 部分的问题很大。这在我们的生产集群(314 个数据节点)上根本无法很好地扩展。
C部分的一些细节:
- 我们使用从 Section A 创建的缓存 DF
来自 A 部分的 DF 在一个较小的表(约 1000 行)上连接了四次,它们被联合在一起,基本上像:
SELECT * FROM t1 JOIN t2 WHERE t1.a = t2.x AND t2.z = 'a' UNION ALL SELECT * FROM t1 JOIN t2 WHERE t1.b = t2.x AND t2.z = 'b' UNION ALL SELECT * FROM t1 JOIN t2 WHERE t1.c = t2.x1 AND t1.d = t2.x2 AND t2.z = 'c' UNION ALL SELECT * FROM t1 JOIN t2 WHERE t1.e = t2.y1 AND t1.f = t2.y2 AND t2.z = 'd'
使用稍微不同的参数再次执行此查询。
每个查询的结果都会被缓存,因为它们被过滤了几次,然后被写入 parquet。
有什么可以解释火花 1.6 和 2.2 之间的差异吗?
我是火花调整的新手,所以如果我能提供更多信息,请告诉我。