问题标签 [apache-spark-2.2]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
pyspark - read textfile in pyspark2
I am trying to read a text file in spark 2.3 using python,but I get this error. This is the format textFile is in:
Code:
Error:
While I am trying to read a textFile into an RDD, its being collected as a single column.
Should the data file should be changed or shoud I change my code?
java - Spark2 Kafka结构化流Java不知道from_json函数
我有一个关于 Kafka 流上的 Spark 结构化流的问题。
我有一个类型的模式:
我从 Kafka 主题引导我的流,例如:
接下来转换为字符串,字符串类型:
现在我想将值字段(它是一个 JSON)转换为之前转换的模式,这应该使 SQL 查询更容易:
Spark 2.3.1 好像不知道这个from_json
功能?
这是我的进口:
关于如何解决这个问题的任何想法?请注意,我不是在寻找 Scala 解决方案,而是基于纯 Java 的解决方案!
scala - 如何使用scala对spark中rdd的每一行进行排序?
我的文本文件有以下数据:
我想按降序对每一行进行排序。我试过下面的代码
我得到了以下输出
上面的结果表明整个列表已按降序排序。但我希望按降序对每个值进行排序
例如
应该
请抽出时间来回答这个问题。在此先感谢。
scala - 如何获取与spark scala数据框中某些列的最小值对应的行
我有以下代码。df3 是使用以下代码创建的。我想获得 distance_n 的最小值以及包含该最小值的整行。
为了获取整行,我将此 df3 转换为用于执行 spark.sql 的表
如果我喜欢这个 spark.sql("select latitude,longitude,speed,min(distance_n) from table1").show()
如果 spark.sql("select latitude,longitude,speed,min(distance_nd) from table180").show()
// 通过将 distance_n 替换为 distance_nd 它会抛出错误
如何解决此问题以获取对应于最小值的整行
apache-spark - Apache Spark orc 读取大量小文件时的读取性能
当从目录下的 HDFS 读取大量 orc 文件时,spark 直到一段时间后才会启动任何任务,并且在此期间我看不到任何任务在运行。我正在使用下面的命令来读取 orc 和 spark.sql 配置。
发出 spark.read.orc 时,火花在做什么?
另外,我没有直接读取 orc 文件,而是尝试在同一数据集上运行 Hive 查询。但我无法推送过滤谓词。我应该在哪里设置以下配置
"hive.optimize.ppd":"true",
"hive.optimize.ppd.storage":"true"
建议从 HDFS 读取 orc 文件和调整参数的最佳方法是什么?
apache-spark - Spark - 不允许操作:alter table replace columns
看起来 hivereplace columns
不适用于 spark 2.2.1 和 2.3.1
看起来它已在ADD COLUMNS
2.2 版本以后修复。它也适用于我,但replace columns
仍然失败。
不允许的操作:alter table add columns(line 1, pos 0)
以下文档说应该支持它。不知道为什么它对我来说失败了。 https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#supported-hive-features
apache-spark - 在纱线上使用火花流进行动态分配,而不是缩小执行程序
我在纱线集群上使用火花流(火花版本 2.2),并试图为我的应用程序启用动态核心分配。
执行器的数量根据需要增加,但是一旦分配了执行器,即使流量减少,它们也不会被缩减,即一旦分配的执行器不会被释放。我还在纱线上启用了外部洗牌服务,如此处所述: https ://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service
我在 spark-submit 命令中设置的配置是:
如果我缺少任何特定的配置,有人可以帮忙吗?
谢谢
scala - Spark 2.2 数据框 [scala]
上面是输入数据集的表格,下面是预期的输出。这里的问题是我们应该根据订单号而不是状态出现次数来计算。我们可以在使用 scala 的 spark 数据帧的帮助下做到这一点吗?提前感谢您的帮助。
scala - Spark 2.2 中的 Spark 程序比 Spark 1.6 慢两倍
我们正在将 Scala Spark 程序从 1.6.3 迁移到 2.2.0。有问题的程序有四个部分:我们称它们为 A、B、C 和 D 部分。A 部分解析输入(parquet 文件),然后缓存 DF 并创建一个表。然后B、C、D段依次对DF进行不同类型的处理。
这个 spark 作业每小时运行大约 50 次,输入文件的数量取决于当前可用的文件。executors、cores、executor memory 和 agg partitions 的数量是根据输入文件的数量和它们的大小来计算的。这些是这样计算的:
- n_execs = min(n_infiles, num_datanodes)
- n_cores = ceil(n_infiles/n_execs)
- exec_mem = 1500 * n_cores (MB)
- shuffle_partitions = n_infiles
- driver_mem = ceil(n_infiles / 200) (GB)
- 最大分区字节数 = 5*1024*1024(常数)
在我们的单元测试环境中,我们目前有 12 个数据节点,平均文件输入大小为 11MB / 630K 记录。所以一些例子是:
- n_infiles (24): driver_mem (1G), n_execs (12), n_cores (2), exec_mem (3000M), shuffle_partitions (24)
- n_infiles (4): driver_mem (1G), n_execs: (4), n_cores (1), exec_mem (1500M), shuffle_partitions (4)
现在的问题是:使用 Spark 2.2 分析 1095 次运行,我们看到 C 部分平均耗时 41.2 秒,而 D 部分平均耗时 47.73 秒。在 Spark 1.6.3 中,超过 425 次运行的 C 部分平均耗时 23 秒,而 D 部分平均耗时 64.12 秒。A 部分和 B 部分在两个版本之间具有几乎相同的平均运行时间。D 部分的改进很大,但 C 部分的问题很大。这在我们的生产集群(314 个数据节点)上根本无法很好地扩展。
C部分的一些细节:
- 我们使用从 Section A 创建的缓存 DF
来自 A 部分的 DF 在一个较小的表(约 1000 行)上连接了四次,它们被联合在一起,基本上像:
SELECT * FROM t1 JOIN t2 WHERE t1.a = t2.x AND t2.z = 'a' UNION ALL SELECT * FROM t1 JOIN t2 WHERE t1.b = t2.x AND t2.z = 'b' UNION ALL SELECT * FROM t1 JOIN t2 WHERE t1.c = t2.x1 AND t1.d = t2.x2 AND t2.z = 'c' UNION ALL SELECT * FROM t1 JOIN t2 WHERE t1.e = t2.y1 AND t1.f = t2.y2 AND t2.z = 'd'
使用稍微不同的参数再次执行此查询。
每个查询的结果都会被缓存,因为它们被过滤了几次,然后被写入 parquet。
有什么可以解释火花 1.6 和 2.2 之间的差异吗?
我是火花调整的新手,所以如果我能提供更多信息,请告诉我。
classnotfoundexception - ClassNotFound 与 Ozzie、Azure HDInsight 和 Spark2
经过1周的研究,不得不提出这个要求:
- 环境:Azure HDInsight
- Oozie 版本:“Oozie 客户端构建版本:4.2.0.2.6.5.3004-13”
- 火花:火花2
- 我的程序:简单的 Scala 程序读取文件 i.csv,并将其写入 o.csv
- 使用 Spark-Submit 测试:是
工作属性
工作流.xml:
我得到以下异常:
我总结了这些:
- 一些它是如何指向 < Spark 2 的,因为 Spark 会话是在 Spark 的更高版本中引入的
- 此外,oozie 可以提交作业,因为我使用“yarn logs -applicationId appid”提取了这个错误,我从 oozie 日志中获取了 appid。
现在,如果我在 job.properties 中添加这一行
我得到以下异常:
我总结了这些:
- Oozie 无法提交作业,因为我在 oozie 日志本身上发现了错误。
我不明白为什么要这么复杂,如果 Microsoft Azure 正在用 spark2 打包 HDInsight,oozie ......这东西应该运行顺利或稍作改动,应该在某处提供干净的文档。