问题标签 [spark3]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
353 浏览

elasticsearch - PySpark 3.1.1 的 Elasticsearch 插件

我成功地使用了 Elasticsearch Spark 7.12.0 和 PySpark 2.4.5。读和写都很完美。现在,我正在测试升级到 Spark 3.1.1,这种集成不再起作用。PySpark 在 2.4.5 和 3.1.1 之间没有代码更改。

有兼容的插件吗?有没有人让它与 PySpark 3.1.1 一起使用?

错误:

在此处输入图像描述

0 投票
1 回答
329 浏览

apache-spark - spark3 因 py4j.protocol.Py4JJavaError 而崩溃

我正在尝试从 emr-5.28.0(spark 2.4.4) 迁移到 emr-6.2.0(spark 3.0.1),无论我做什么,spark 的最基本用法都会崩溃。

这是我的 test_pyspark.py 文件:

我将它上传到 S3 并使用

它崩溃了。

这是容器日志:

找不到任何关于

因此,任何帮助将不胜感激。

0 投票
0 回答
58 浏览

java - Spark3 Streaming 中的 Phoenix Driver ClassNotFound

我正在将现有的 spark 流应用程序从 spark2.3 迁移到 spark3.1.1。我已经更新了下面提到的 spark 依赖项

并添加了凤凰依赖

在运行火花作业时,我得到了java.lang.ClassNotFoundException: org.apache.phoenix.jdbc.PhoenixDriver。以前使用 spark2.3 运行相同的代码,但现在不使用 spark3.1.1。

有人可以帮我知道我在这里想念什么吗?

0 投票
0 回答
33 浏览

apache-spark - 为什么没有显示 AQE?

我的代码就像

最终计划显示 2 个广播连接和 1 个 SortMergeJoin。SortMergeJoin 是 100+ 到 200+ 百万行表之间的 LEFT JOIN。而且它有偏差。我的问题是我启用了 AQE,并使用了一些配置(例如使用 spark.sql.shuffle.partitions=40000、spark.default.parallelism=400),但我没有看到 AQE 合并,也没有看到 AdaptiveSparkPlan 节点。我看到很多 AQE 的例子都是使用 GROUP BY。AQE 是否仅适用于 GROUP BY?为什么我的查询没有显示 AdaptiveSparkPlan 节点?

谢谢

0 投票
0 回答
898 浏览

apache-spark - SPARK SQL 抛出 AssertionError: assertion failed: Found duplicate rewrite attributes (Spark 3.0.2)

在 Spark 3.0.2 中执行上述操作会 在线程“main”java.lang.AssertionError 中产生异常:断言失败:发现重复的重写属性。 它在 Spark 2.4.3 中工作。

view_1的标题:

| name |id|second_id|local_timestamp|utc_timestamp|alias_1_column_1|alias_1_column_2 | alias_1_column_3|alias_1_column_4|alias_1_column_5|alias_1_column_6|alias_1_column_7 |

view_2的标题:

| name | id |second_id |local_timestamp |utc_timestamp|alias_2_coumn_1|

有趣的是,我们有几个这样的查询,但只有一个特别给出了问题。我看到了这个问题,但不确定如何将其翻译为 spark sql。

0 投票
0 回答
107 浏览

apache-spark - 我们可以在单个系统中同时设置 Spark2.4 和 Spark3.0 吗?

我在我的 Windows 中安装了 Spark 2.4。这是我的生产环境所必需的。使用 Spark2.4 。现在,我还想测试 Spark3.0 功能。那么我可以在同一台 Windows 机器上安装 Spark-3.0 二进制文件而不会干扰 Spark-2.4 的安装吗?我不想使用 Linux 子系统。或虚拟机。

0 投票
0 回答
65 浏览

scala - 将 Spark2.2 的 UDAF 转换为 3.0 聚合器

我已经使用 Spark2.4 在 scala 中编写了 UDAF。由于我们的 Databricks 集群处于不再支持的 6.4 运行时,我们需要迁移到具有长期支持并使用 Spark3 的 7.3 LTS。UDAF 在 Spark3 中已弃用,将来会被删除(很可能)。所以我正在尝试将 UDAF 转换为聚合器函数

使用后,输出为:

{“id”:1282,“名称”:“麦考密克圣诞节”}

{“id”:1305,“名称”:“麦考密克完美捏”}

{"id": 1677, "name": "Viking Cruises Viking Cruises"}

0 投票
1 回答
295 浏览

apache-zeppelin - Apache Livy 0.7.0 无法创建交互式会话

在使用 apache Livy 0.7.0 创建新会话时,我遇到了错误。我还使用 zeppelin notebook(livy 解释器)来创建会话。

使用 Scala 版本 2.12.10、Java HotSpot(TM) 64 位服务器 VM、11.0.11
Spark 3.0.2 zeppelin 0.9.0

知道为什么我会收到错误吗?

我已经检查了我们在类路径中有 livy-repl_2.11-0.7.1-incubating.jar 并且 JAR 已经有它无法找到的类。

错误日志

0 投票
0 回答
26 浏览

python - Spark UDF:将 np.sum 应用于数据帧中的值列表并根据阈值过滤值

非常了解使用 spark 进行数据操作和 UDF。我有一个样本 df 具有不同的测试分数。像这样的有 50 个不同的列。我正在尝试定义一个自定义应用函数来过滤大于 80 的值(每行中的总计数)。

考试分数

[65, 92, 96, 72, 70, 85, 72, 74, 79, 10, 82]

[59、81、91、69、66、75、65、61、71、85、69]

以下是我正在尝试的:

收到以下错误:

0 投票
0 回答
182 浏览

apache-spark - Spark AQE Post-Shuffle partitions coalesce 无法按预期工作,甚至在某些分区中造成数据倾斜。为什么?

我在我的 spark DF 上使用全局排序,当我启用 AQE 和 post-shuffle 合并时,排序操作后的分区变得比以前更糟糕。

我的查询,在高层次上,看起来:

  1. 可能导致倾斜的列->是的,我的数据分布不均,这就是我使用盐的原因。
  2. 我从 Kafka 读取数据,所以我使用 Kafka 分区 + 偏移列作为盐。
  3. 为什么在后台使用 reprtitoinByRange 的排序对我没有帮助,我想启用 AQE?-> 现在我看到我的 Kafka 消息的大小差异可能太大。因此,我看到范围重新分区后的分区具有几乎相同数量的记录,但字节数仍然非常不均匀。
  4. 为什么我认为 AQE 必须帮助我?-> 我想创建许多小范围,即使我的数据偏差不会超过~50mb,所以后洗牌合并将能够将它们合并到目标大小(256mb)。在我的情况下,最高 320mb 是可以的。

我的第一个假设是,即使范围很小,峰值也会太大。但我检查并确认按范围重新分区给了我良好的记录分布,但不好的是大小。我有近 200 个分区,记录数量几乎相同,大小差异高达 9 倍,从 ~100Mb 到 ~900mb。但是通过 AEQ 和重新分区到 18000 个小范围,最小的分区是 18mib,最大的分区是 1.8Gib。这种情况比没有 AEQ 的情况要糟糕得多。需要强调的是,我使用 Spark UI -> Details for Stage 选项卡中的指标来确定分区大小(以字节为单位),并且我有自己的记录日志。

所以我开始调试问题,但是 AQE 的输入和输出没有足够的日志 ShufflePartitionsUtil.coalescePartitions。这就是我将查询重写为 repartitionByRange.sortWithingPartitoins 的原因。并通过额外的日志记录进行物理计划优化。我的日志告诉我,我最初的想法是正确的。

  • map 和 write shuffle 阶段之后的输入分区被拆分为足够小
  • Coalesce 算法将它们收集到一个正确的数量,并且分布在字节分区中。

最小大小是如此不同,因为最后一个分区的大小,这是预期的。TRACE 日志级别显示 99% 的分区接近 290mib。

  • 但是为什么 spark UI 显示出如此不同的结果呢?->

  • spark UI 可能有问题吗?->

  • 也许吧,但除了任务大小,一个任务的持续时间也太大了,这让我觉得 spark UI 还可以。

  • 所以我的假设是问题出MapOutputStatistics在我的阶段。但它总是坏掉还是只在我的情况下?->

  • 仅在我的情况下?-> 我做了一些检查以确认它。

    • 我从 s3(块大小为 120mb 的镶木地板文件)-> 中读取了相同的数据集,并且 AQE 按预期工作。洗牌后合并返回给我 188,按大小、分区很好地分布。重要的是要注意 s3 上的数据分布不均,但在读取过程中的 spark 将其拆分为 259 个接近 120mb 大小的分区,主要是因为 parquet 块大小为 120mb。
    • 我从 Kafka 中读取了相同的数据集,但从分区函数中排除了具有偏斜的列 -> 并且 AQE 按预期工作。洗牌后合并返回给我 203,按大小、分区很好地分布。
    • 我尝试禁用缓存-> 这没有任何结果。我使用缓存,只是为了避免从 kafka 重复读取。因为按范围重新分区使用采样。
    • 我尝试禁用 AQE 并将 18000 个分区写入 s3 -> 结果是预期的,与我的合并输入日志显示的相同:17999 个文件,最小的接近 8mib,最大的 56mib。
  • 所有这些检查让我认为这MapOutputStatistics仅对我的情况是错误的。可能是如何与 Kafka 源相关联或我的 Kafka 输入数据分布非常不均匀的问题。

问题:

  • 那么有人知道我做错了什么吗?
  • 在我的情况下,我可以用输入数据做些什么来使后洗牌合并工作?
  • 如果你认为我是对的,请发表评论。

PS 我还想提一下,我的输入 Kafka 数据帧是 2160,甚至不是分布式分区 -> 某些分区可以比其他分区大 2 倍。minPartitions从具有 720 个分区和选项 * 3的 Kafka 主题中读取。