问题标签 [apache-spark-2.2]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
5261 浏览

pyspark - read textfile in pyspark2

I am trying to read a text file in spark 2.3 using python,but I get this error. This is the format textFile is in:

Code:

Error:

While I am trying to read a textFile into an RDD, its being collected as a single column.

Should the data file should be changed or shoud I change my code?

0 投票
1 回答
535 浏览

java - Spark2 Kafka结构化流Java不知道from_json函数

我有一个关于 Kafka 流上的 Spark 结构化流的问题。

我有一个类型的模式:

我从 Kafka 主题引导我的流,例如:

接下来转换为字符串,字符串类型:

现在我想将值字段(它是一个 JSON)转换为之前转换的模式,这应该使 SQL 查询更容易:

Spark 2.3.1 好像不知道这个from_json功能?

这是我的进口:

关于如何解决这个问题的任何想法?请注意,我不是在寻找 Scala 解决方案,而是基于纯 Java 的解决方案!

0 投票
3 回答
925 浏览

scala - 如何使用scala对spark中rdd的每一行进行排序?

我的文本文件有以下数据:

我想按降序对每一行进行排序。我试过下面的代码

我得到了以下输出

上面的结果表明整个列表已按降序排序。但我希望按降序对每个值进行排序

例如

应该

请抽出时间来回答这个问题。在此先感谢。

0 投票
1 回答
932 浏览

scala - 如何获取与spark scala数据框中某些列的最小值对应的行

我有以下代码。df3 是使用以下代码创建的。我想获得 distance_n 的最小值以及包含该最小值的整行。

在此处输入图像描述

在此处输入图像描述

为了获取整行,我将此 df3 转换为用于执行 spark.sql 的表

如果我喜欢这个 spark.sql("select latitude,longitude,speed,min(distance_n) from table1").show()

//它抛出错误 在此处输入图像描述

如果 spark.sql("select latitude,longitude,speed,min(distance_nd) from table180").show()

// 通过将 distance_n 替换为 distance_nd 它会抛出错误

在此处输入图像描述

如何解决此问题以获取对应于最小值的整行

0 投票
0 回答
940 浏览

apache-spark - Apache Spark orc 读取大量小文件时的读取性能

当从目录下的 HDFS 读取大量 orc 文件时,spark 直到一段时间后才会启动任何任务,并且在此期间我看不到任何任务在运行。我正在使用下面的命令来读取 orc 和 spark.sql 配置。

发出 spark.read.orc 时,火花在做什么?

另外,我没有直接读取 orc 文件,而是尝试在同一数据集上运行 Hive 查询。但我无法推送过滤谓词。我应该在哪里设置以下配置 "hive.optimize.ppd":"true", "hive.optimize.ppd.storage":"true"

建议从 HDFS 读取 orc 文件和调整参数的最佳方法是什么?

0 投票
1 回答
2865 浏览

apache-spark - Spark - 不允许操作:alter table replace columns

看起来 hivereplace columns不适用于 spark 2.2.1 和 2.3.1

看起来它已在ADD COLUMNS2.2 版本以后修复。它也适用于我,但replace columns仍然失败。

不允许的操作:alter table add columns(line 1, pos 0)

以下文档说应该支持它。不知道为什么它对我来说失败了。 https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#supported-hive-features

https://docs.databricks.com/spark/latest/spark-sql/language-manual/alter-table-or-view.html#replace-columns

0 投票
1 回答
1717 浏览

apache-spark - 在纱线上使用火花流进行动态分配,而不是缩小执行程序

我在纱线集群上使用火花流(火花版本 2.2),并试图为我的应用程序启用动态核心分配。

执行器的数量根据需要增加,但是一旦分配了执行器,即使流量减少,它们也不会被缩减,即一旦分配的执行器不会被释放。我还在纱线上启用了外部洗牌服务,如此处所述: https ://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service

我在 spark-submit 命令中设置的配置是:

如果我缺少任何特定的配置,有人可以帮忙吗?

谢谢

0 投票
1 回答
51 浏览

scala - Spark 2.2 数据框 [scala]

上面是输入数据集的表格,下面是预期的输出。这里的问题是我们应该根据订单号而不是状态出现次数来计算。我们可以在使用 scala 的 spark 数据帧的帮助下做到这一点吗?提前感谢您的帮助。

0 投票
0 回答
143 浏览

scala - Spark 2.2 中的 Spark 程序比 Spark 1.6 慢两倍

我们正在将 Scala Spark 程序从 1.6.3 迁移到 2.2.0。有问题的程序有四个部分:我们称它们为 A、B、C 和 D 部分。A 部分解析输入(parquet 文件),然后缓存 DF 并创建一个表。然后B、C、D段依次对DF进行不同类型的处理。

这个 spark 作业每小时运行大约 50 次,输入文件的数量取决于当前可用的文件。executors、cores、executor memory 和 agg partitions 的数量是根据输入文件的数量和它们的大小来计算的。这些是这样计算的:

  • n_execs = min(n_infiles, num_datanodes)
  • n_cores = ceil(n_infiles/n_execs)
  • exec_mem = 1500 * n_cores (MB)
  • shuffle_partitions = n_infiles
  • driver_mem = ceil(n_infiles / 200) (GB)
  • 最大分区字节数 = 5*1024*1024(常数)

在我们的单元测试环境中,我们目前有 12 个数据节点,平均文件输入大小为 11MB / 630K 记录。所以一些例子是:

  • n_infiles (24): driver_mem (1G), n_execs (12), n_cores (2), exec_mem (3000M), shuffle_partitions (24)
  • n_infiles (4): driver_mem (1G), n_execs: (4), n_cores (1), exec_mem (1500M), shuffle_partitions (4)

现在的问题是:使用 Spark 2.2 分析 1095 次运行,我们看到 C 部分平均耗时 41.2 秒,而 D 部分平均耗时 47.73 秒。在 Spark 1.6.3 中,超过 425 次运行的 C 部分平均耗时 23 秒,而 D 部分平均耗时 64.12 秒。A 部分和 B 部分在两个版本之间具有几乎相同的平均运行时间。D 部分的改进很大,但 C 部分的问题很大。这在我们的生产集群(314 个数据节点)上根本无法很好地扩展。

C部分的一些细节:

  • 我们使用从 Section A 创建的缓存 DF
  • 来自 A 部分的 DF 在一个较小的表(约 1000 行)上连接了四次,它们被联合在一起,基本上像:

    SELECT * FROM t1 JOIN t2 WHERE t1.a = t2.x AND t2.z = 'a' UNION ALL SELECT * FROM t1 JOIN t2 WHERE t1.b = t2.x AND t2.z = 'b' UNION ALL SELECT * FROM t1 JOIN t2 WHERE t1.c = t2.x1 AND t1.d = t2.x2 AND t2.z = 'c' UNION ALL SELECT * FROM t1 JOIN t2 WHERE t1.e = t2.y1 AND t1.f = t2.y2 AND t2.z = 'd'

  • 使用稍微不同的参数再次执行此查询。

  • 每个查询的结果都会被缓存,因为它们被过滤了几次,然后被写入 parquet。

有什么可以解释火花 1.6 和 2.2 之间的差异吗?

我是火花调整的新手,所以如果我能提供更多信息,请告诉我。

0 投票
2 回答
295 浏览

classnotfoundexception - ClassNotFound 与 Ozzie、Azure HDInsight 和 Spark2

经过1周的研究,不得不提出这个要求:

  • 环境:Azure HDInsight
  • Oozie 版本:“Oozie 客户端构建版本:4.2.0.2.6.5.3004-13”
  • 火花:火花2
  • 我的程序:简单的 Scala 程序读取文件 i.csv,并将其写入 o.csv
  • 使用 Spark-Submit 测试:是

工作属性

工作流.xml:

我得到以下异常:

我总结了这些:

  • 一些它是如何指向 < Spark 2 的,因为 Spark 会话是在 Spark 的更高版本中引入的
  • 此外,oozie 可以提交作业,因为我使用“yarn logs -applicationId appid”提取了这个错误,我从 oozie 日志中获取了 appid。

现在,如果我在 job.properties 中添加这一行

我得到以下异常:

我总结了这些:

  • Oozie 无法提交作业,因为我在 oozie 日志本身上发现了错误。

我不明白为什么要这么复杂,如果 Microsoft Azure 正在用 spark2 打包 HDInsight,oozie ......这东西应该运行顺利或稍作改动,应该在某处提供干净的文档。