问题标签 [catalyst-optimizer]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-spark - 具有 10K+ 宽列的 Spark-sql 表 - 如何加快执行速度
您对如何在具有 10K+ 列的表上加速 Spark-SQL 有什么建议吗?
TL;TR: . 我必须解决 spark-SQL 处理具有来自 ORC 文件的 10K+ 列的宽表的性能问题。我注意到了这种改进 /CR:https://issues.apache.org/jira/browse/SPARK-25643
Spark 架构旨在处理长而窄的数据(<1K cols),这与我们这里的 = 10K 列 - 宽且相对较短的输入相反。
建议的解决方案:
简单:而不是 10K+ 列上的一个大 SQL - 创建多个 SQL - 在临时表中输出 - 然后从临时表连接/联合到最后一个。
SPARK:“解决者/计划构建者”——不确定我是否正确命名,需要进一步调查。我运行这个测试:
输出:
列 | 预期线性 [ms] | 真实 [毫秒] | 真实 [分钟] |
---|---|---|---|
100 | 200 | 183 | 0.003 |
250 | 500 | 306 | 0.005 |
500 | 1000 | 524 | 0.009 |
1000 | 2000 | 1514 | 0.025 |
2500 | 5000 | 7922 | 0.132 |
5000 | 10000 | 31131 | 0.519 |
10000 | 20000 | 134429 | 2.240 |
12500 | 25000 | 200568 | 3.343 |
15000 | 30000 | 348068 | 5.801 |
20000 | 40000 | 595804 | 9.930 |
尝试从 O(N^2) => O(N) ..
SPARK:自动分离器 - 在火花管道中引入 2 个新阶段:
- 语法拆分器 - 在开始时,即:更改“联合”行为 - 拆分为单独的 SQL,或更高级 - 处理 SQL,将列分成组:A - 参与转换(数学等的一部分),B - 不转换,然后创建多个列数较少的 SQL
- 最后从那些较小的 SQL 中输出混合器(连接/联合)。
TODO:寻找开销:过于复杂的计算/结构,计划中的新阶段等。
- 框外命题 [3rd party],例如:H2O with ORC parser
H2O 有自己的 DF 表示 - 由 Cliff Click 完全重写。H2O 支持 xxK+ 列。H2O 计算 - 低级 - 无 SQL。期望:相当高 - H2O 被设计为使用 WIDE 列,并且列转换非常高效 - 它们有自己的索引机制等。
开箱即用的命题:DB 和使用 DBA 解决问题
开箱即用:noSQL,因为它是为宽列设计的
Spark 命题相当大/影响现有的 SQL 功能——不能保证这在将来会起作用。
注意:这个 PoC 可能是火花分离器解决方案的一个足够好的指标。期望:假设当前时间是指数 => 更改为时间的总和,一切都将取决于最终的连接(不应该有任何其他操作)。
否则,基本上,我在泡菜:请任何建议如何解决这个问题。
apache-spark - DF自连接和额外条件的Spark Physical Plan解释
我在这里回答了一个关于 SO 的问题,作为一个勤奋的人,我查看了物理计划。
编码:
物理计划:
以我的思维方式:
- 应该只复制或重复使用 DF
- 并且应该在同一分区内检查 rowid -1。
不知道如何解释这部分:
好的,不是大规模的,但似乎已经为(rowid#1251L - 1)
恕我直言决定了一些不必要的聪明才智。除非排序以某种方式补偿,但我并不相信。谁能说出优化器想法的真正含义?我想我可以猜到,但这似乎是多余的。
dataframe - 通过 RDD 和缓存角色修剪 Apache Spark 数据帧沿袭
有以下技巧如何修剪 Apache Spark 数据帧沿袭,特别是对于迭代计算:
它看起来像是某种纯粹的魔法,但现在我想知道为什么我们需要cache()
在 RDD 上调用方法?在此沿袭修剪逻辑中具有缓存的目的是什么?
scala - Apache Spark requiredChildDistribution 和 outputPartitioning 有什么区别?
在 Apache Spark 中,物理计划中的每个物理算子都有 4 个属性:
- 输出分区
- 输出排序</li>
- requiredChildDistribution
- requiredChildOrdering
但是 outputPartioning 和 requiredChildDistribution 不一样吗?它们有何不同,它们从根本上代表什么?outputOrdering 和 requiredChildOrdering 一样吗?
apache-spark - 当 spark 使用 joinWith 连接两个数据集时,是否可以避免第二次交换?
对于以下代码片段:
Spark 为数据集生成以下计划:
对于数据框:
使用joinWith连接两个数据集时是否可以避免另一个相同的交换?
scala - 如何在 Spark 中使用 MapType 作为催化剂函数的返回类型
我正在尝试在 spark 中创建自己的自定义催化剂功能。这是一个实现,我希望函数的返回数据类型是 MapType。Literal(0)
当我为地图中的所有键返回值时,我可以使用该函数。但我无法理解如何通过键从 mapType 属性参考(用于存储结果的缓冲区)中获取值以更新地图值。
我想通过在我的更新表达式函数中执行类似 customCount.get(nullKey) 的操作来获取值
apache-spark - pyspark 中 bucketBy 的用处是什么?
我正在读取我保存到磁盘的两个数据集,这些数据集bucketBy
在具有相同分区数的相同键上具有选项。当我读回它们并加入它们时,它们不应该导致洗牌。
但是,我看到的情况并非如此。以下代码演示了所谓的行为:
我得到的输出是
这显然有 hashpartitioning goiong 。请帮我澄清bucketBy