问题标签 [apache-spark-1.6]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
900 浏览

apache-spark - 如何对 Spark Streaming 代码进行单元测试?

我使用最新的 Spark 1.6.0。

看了另一个 stackoverflow 帖子How can I make Spark Streaming count the words in a file in a unit test?

我正在尝试使用示例 @ https://gist.github.com/emres/67b4eae86fa92df69f61 来编写 Spark 的示例单元测试。之后我打算为我的实际 Spark 应用程序编写一个单元测试。但是,我无法使用示例 @ https://gist.github.com/emres/67b4eae86fa92df69f61。这给了我错误

java.lang.IllegalStateException:oracle.security.ti.reportgenerator.test.StarterAppTest.testCountWords(StarterAppTest.java:62) 不支持在启动上下文后添加新的输入、转换和输出操作

有什么想法可以解决这个问题吗?

0 投票
3 回答
13814 浏览

python - PySpark 序列化 EOFError

我正在将 CSV 作为 Spark DataFrame 读取并对其执行机器学习操作。我不断收到 Python 序列化 EOFError - 知道为什么吗?我认为这可能是一个内存问题 - 即文件超出了可用 RAM - 但大幅减少 DataFrame 的大小并不能防止 EOF 错误。

玩具代码和错误如下。

在单个节点上重复运行上述代码spark-submit会引发以下错误,即使在拟合模型之前 DataFrame 的大小已减小(例如tinydf = df.sample(False, 0.00001)

0 投票
2 回答
2203 浏览

apache-spark - 从 Cassandra 读取数据时如何控制分区数?

我用:

  1. cassandra 2.1.12 - 3 个节点
  2. 火花 1.6 - 3 个节点
  3. 火花卡桑德拉连接器 1.6

我在 Cassandra 中使用令牌(不是 vnodes)。

我正在编写一个从 Cassandra 表中读取数据并显示其计数表的简单工作,该表有大约 7000 万行,并且需要 15 分钟。

当我读取数据并检查 RDD 的分区数时,它在 21000 左右,这太大了。如何控制这个数字?

我试过了splitCountsplit.size.in.mbs但他们告诉我相同数量的分区。

有什么建议么?

}

这是我的参考代码。我现在运行 nodetool compact 我能够控制分区的数量,但整个过程仍然需要将近 6 分钟,我认为任何改进建议都太高了

0 投票
1 回答
768 浏览

apache-kafka - Spark Streaming 应用程序因 KafkaException 失败:字符串超出最大大小或 IllegalArgumentException

TL;博士:

我非常简单的 Spark Streaming 应用程序在驱动程序中失败,并显示“KafkaException:字符串超出最大大小”。我在执行程序中看到了相同的异常,但我还在执行程序日志的某处发现了一个 IllegalArgumentException,其中没有其他信息

完整问题:

我正在使用 Spark Streaming 从 Kafka 主题中读取一些消息。这就是我正在做的事情:

我对 Kafka 数据所做的只是使用以下方法打印它:

我的应用程序显然有比这更多的代码,但为了找到我的问题,我从代码中删除了所有可能的内容

我正在尝试在 YARN 上运行此代码。这是我的火花提交行:

streamconfig.properties 文件只是一个常规属性文件,可能与这里的问题无关

在尝试执行应用程序后,它很快就失败了,驱动程序出现以下异常:

我什至没有在堆栈跟踪中看到我的代码

检查执行程序,我发现了与驱动程序相同的异常,但也深埋了以下异常:

我不知道 IllegalArgument 是什么,因为没有包含任何信息

我的 YARN 使用的 Spark 版本是 1.6.0。我还验证了我的 pom 包含 Spark 1.6.0 而不是早期版本。我的范围是“提供”

我从完全相同的主题手动读取数据,那里的数据只是普通的 JSON。那里的数据一点也不庞大。绝对小于 32767。此外,我可以使用常规命令行使用者读取这些数据,这很奇怪

遗憾的是,谷歌搜索这个异常并没有提供任何有用的信息

有谁知道如何理解这里的问题到底是什么?

提前致谢

0 投票
2 回答
19163 浏览

apache-spark - 获取分组中的第一个非空值(Spark 1.6)

如何从 group by 中获取第一个非空值?我尝试使用first with coalesce F.first(F.coalesce("code"))但我没有得到想要的行为(我似乎得到了第一行)。

我试过了:

期望的输出

0 投票
0 回答
1341 浏览

apache-spark - pyspark 安装错误,“ImportError: No module named pyspark”

我正在尝试将 apache spark-1.6.1 安装为独立模式。我已关注“ https://github.com/KristianHolsheimer/pyspark-setup-guide ”链接。但是,执行后

我努力了

但是,它给出了一个错误,

完成所有步骤后,我在 ipython 中给出了以下命令

它给出了以下错误:

我不明白发生了什么。请帮我解决这个问题。

0 投票
0 回答
46 浏览

apache-spark - 当 reduce() 上有 2x4CPU 可用时,Spark 仅使用 1CPU

我有 3 台机器:1x Master,4x CPU,8G RAM;2x 执行器,4x CPU 和 16G RAM。

master 是独立模式(无 YARN),我使用的是 pyspark。

即使它不是一个巨大的基础设施,我仍然希望它有一些性能。运行reduce操作时:

where tfsenthastfsentimentwhich are SparseVector, andspvecadd是要添加的自制函数SparseVector

这样做,在 3x 4CPU 上,只有一个在 executor 上运行 100%。其他都是0%,内存在5G/16G左右。我不明白: * 为什么这么长 * 为什么只有 1x CPU 工作。

我应该自己对数据进行分区吗?(我的意思是明确地在两个执行者上分发数据?即使在我看来那是 Spark 的工作)。

感谢您提供任何帮助、想法或提示 pltrdy

附加信息

  • 两个 executor 都连接到 master 并“分配”给任务(可以使用 spark web UI 检查它)

  • 我有大约 380k 线。两个向量维度都小于 100。(不是很多)。

  • 复杂性可能更依赖于维度而不是行数。

更新

事实证明,我必须使用repartition(8)才能使 RDD 分布式。这解决了我的问题,但不完全是我的问题:为什么我必须这样做?

我想这是因为我如何获取数据。我正在从数据库中读取,即

其中,我想存储而不分发它。

0 投票
2 回答
4393 浏览

apache-spark - 如何动态选择 spark.sql.shuffle.partitions

我目前正在使用 spark 和 foreach 分区处理数据,打开与 mysql 的连接并将其以 1000 个批量插入数据库。如SparkDocumentation中所述,默认值为spark.sql.shuffle.partitions200,但我想保持动态。那么,我该如何计算呢。因此,既不选择非常高的值导致性能下降,也不选择非常小的值导致OOM.

0 投票
2 回答
5965 浏览

apache-spark - UI 中未显示正在运行的 spark 作业

我已经按照此处bin/spark-submit --class DataSet BasicSparkJob-assembly-1.0.jar提到的方式提交了我的 spark 作业,但没有提及--master参数或spark.master参数。而不是那个作业被提交到我的 3 节点火花集群。但我想知道它在哪里提交了作业,因为它没有显示任何信息Running Applications

0 投票
1 回答
2129 浏览

scala - 如何知道 Spark 使用 Scala 推断出的 RDD 类型是什么

我正在尝试以下示例

然后在shell中我得到以下

但是由于某种原因,我仍然没有弄清楚我能够执行这句话

在外壳中获取它

所以我有一些问题:

1.- 名为 rdd 的 var 的真正 RDD 类型是什么?因为在 shell 中它显示的类型是 org.apache.spark.rdd.RDD[(String, Int)] 但查看 API 时,RDD 类没有方法 aggregateByKey。顺便说一句,JavaPairRDD 类是否有 aggregateByKey 方法

2.- 我如何验证/知道 RDD 的真实类型

3.- ParallelCollectionRDD 出现了什么?我在 github 上查找它,发现它是一个私有类,所以我猜想是 scala API 上没有出现 is 的原因,但它是做什么用的?

我使用的是 Spark 1.6.2