问题标签 [pyspark]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
5054 浏览

python - 如何在spark中将rdd数据一分为二?

我在 Spark RDD 中有一个数据,我想将它分成两部分,比例为 0.7。例如,如果 RDD 看起来像这样:

我想把它分成rdd1

rdd2

比例为 0.7。并且每次都应该是随机rdd1的。rdd2我试过这样:

它有时会起作用,但是当我的数据包含时,dict我遇到了一些问题。例如数据如下:

我明白了

类型错误:不可散列的类型:'dict'

0 投票
1 回答
14745 浏览

python - PySpark 广播来自本地函数的变量

我正在尝试从 Python 方法中创建广播变量(尝试抽象我正在创建的一些依赖于分布式操作的实用程序方法)。但是,我似乎无法从 Spark 工作人员中访问广播变量。

假设我有这个设置:

但是,如果我改为消除SomeMethod()中间人,它就可以正常工作。

如果可以的话,我宁愿不必将所有 Spark 逻辑都放在 main 方法中。有没有办法从本地函数中广播变量并使它们对 Spark 工作人员全局可见?

或者,对于这种情况,什么是好的设计模式——例如,我想为 Spark 编写一个专门的方法,该方法是自包含的并执行我想重用的特定功能?

0 投票
10 回答
105155 浏览

python - 使用 Apache Spark 将键值对缩减为键列表对

我正在编写一个 Spark 应用程序,并希望将一组键值对组合(K, V1), (K, V2), ..., (K, Vn)成一个键多值对(K, [V1, V2, ..., Vn])。我觉得我应该能够使用reduceByKey具有某种风味的功能来做到这一点:

发生这种情况时我得到的错误是:

“NoneType”对象没有“附加”属性。

我的键是整数,值 V1,...,Vn 是元组。我的目标是使用键和值列表(元组)创建一对。

0 投票
1 回答
3373 浏览

amazon-ec2 - 如何充分利用集群中的所有 Spark 节点?

我已经在 Spark 的独立模式下使用 ec2-script 启动了一个 10 节点集群。我正在从 PySpark shell 中访问 s3 存储桶中的数据,但是当我在 RDD 上执行转换时,只使用了一个节点。例如,下面将从 CommonCorpus 中读取数据:

当我运行它时,我的 10 个从站中只有一个处理数据。我知道这一点,因为从 Spark Web 控制台查看时,只有一个从站 (213) 有任何活动日志。当我查看 Ganglia 中的活动时,这个相同的节点 (213) 是唯一一个在活动运行时内存使用量激增的从站。在此处输入图像描述

此外,当我使用只有一个从属设备的 ec2 集群运行相同的脚本时,我具有完全相同的性能。我正在使用 Spark 1.1.0,非常感谢任何帮助或建议。

0 投票
9 回答
76196 浏览

python - 如何添加第三方 Java JAR 文件以在 PySpark 中使用

我有一些 Java 中的第三方数据库客户端库。我想通过

例如:使客户端类(不是 JDBC 驱动程序!)通过 Java 网关对 Python 客户端可用:

不清楚将第三方库添加到 JVM 类路径的位置。我试图添加到文件compute-classpath.sh,但这似乎不起作用。我得到:

Py4jError:试图调用一个包

此外,与 Hive 相比:hive JAR 文件不是通过文件compute-classpath.sh加载的,所以这让我很怀疑。似乎还有一些其他机制正在设置 JVM 端类路径。

0 投票
2 回答
6511 浏览

apache-spark - Spark Python 性能调优

我使用以下命令启动了用于 Spark 开发的 iPython 笔记本:

sc使用 Python 代码创建了一个 SparkContext,如下所示:

我想更好地理解spark.executor.memory文档中的 ,

每个执行程序进程使用的内存量,格式与 JVM 内存字符串相同

这是否意味着在一个节点上运行的所有进程的累积内存不会超过该上限?如果是这种情况,我应该将该数字设置为尽可能高的数字吗?

这也是一些属性的列表,是否有一些其他参数可以从默认值进行调整以提高性能。

谢谢!

0 投票
1 回答
1616 浏览

python - 使用 Spark 的 Gzip 文件

我有一个 Spark 作业,它需要数千个文件作为输入并从 Amazon S3 下载它们并在映射阶段处理它们,其中每个映射步骤都返回一个字符串。我想将输出压缩到.tar.gz文件并随后将其上传到 S3。一种方法是

问题是outputs不适合内存(但它们适合磁盘)。有没有办法在映射阶段将输出保存到主文件系统?或者也许使用循环for output in outputs作为生成器,这样我就不必将所有内容加载到内存中?

0 投票
1 回答
498 浏览

python - 从与 Pyspark 中的给定查询相同的行返回数据

在 Pyspark(一种 Spark/Hadoop 输入语言)中:我想在数据集中查找关键字,例如“SJC”,并从与找到关键字“SJC”的行对应的第二列返回文本。

例如,以下数据集读取:

[年份] [延误] [目的地] [航班#]

|1987| |-5| |SJC| |500|

|1987| |-5| |SJC| |250|

|1987| |07| |旧金山| |700|

|1987| |09| |SJC| |350|

|1987| |-5| |SJC| |650|

我希望能够查询“SJC”并将 [Delay] 值作为列表或字符串返回。

我已经走了这么远,但没有运气:

谢谢您的帮助!

0 投票
3 回答
18814 浏览

python - Spark - 字数测试

我只想计算火花(pyspark)中的单词,但我可以映射字母或整个字符串。

我试过了:(整个字符串)

或(只是字母)

0 投票
2 回答
26204 浏览

python - csv 文件上的 PySpark distinct().count()

我是 spark 新手,我正在尝试根据 csv 文件的某些字段创建 distinct().count() 。

CSV结构(无标题):

加载 .csv 我输入:

lines然后按预期返回 3的不同计数:

但我不知道如何根据 let say idand进行不同的计数country