问题标签 [pyspark]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
410 浏览

apache-spark - PySpark 没有可用于 Word2VecModel 的 getVectors 方法

我正在尝试访问getVectors()pyspark 1.2.0 Spark 版本的方法,但 pyspark 状态 -

所以我只有使用 Scala/Java 访问它的方法,或者我可以做些什么。

0 投票
1 回答
2673 浏览

apache-spark - 为 pyspark 启动的 jvm 指定选项

pyspark 脚本在启动它连接到指定的 jvm 时使用的 jvm 选项如何/在哪里?

我对指定 jvm 调试选项特别感兴趣,例如

谢谢。

0 投票
1 回答
9238 浏览

python - 用于 Spark 数据帧的 udf() 的 Python 包中的函数

对于通过 pyspark 的 Spark 数据帧,我们可以使用pyspark.sql.functions.udf创建一个user defined function (UDF).

我想知道我是否可以使用 Python 包中的任何函数udf(),例如np.random.normalnumpy 中的函数?

0 投票
1 回答
915 浏览

java - Java 安装问题(使使用 Spark 变得困难)

我正在尝试让 Spark 在本地运行。当我启动它时,我收到以下错误,表明我的 java 安装(或它的路径)有些奇怪。这是在 Mac OS Yosemite 上。

有什么建议可以解决这个问题吗?如果您需要更多信息,请告诉我(我不知道从哪里开始)

编辑:显然某处设置错误,我只是不确定在哪里。在我的 bash-profile 中有一行

另外,当我运行 java -version 时,我得到:

0 投票
4 回答
49235 浏览

apache-spark - 从列表的 RDD 创建 Spark DataFrame

我有一个 rdd(我们可以称之为 myrdd),其中 rdd 中的每条记录都采用以下形式:

我想将其转换为 pyspark 中的 DataFrame - 最简单的方法是什么?

0 投票
1 回答
1167 浏览

python-2.7 - Apache Spark:处理来自 S3 的大量数据时出现超时异常

当我从 S3 读取数据并在 Apache Spark 中处理它时,我遇到了超时异常。错误如下:

有人可以帮我解决这个问题吗?

0 投票
0 回答
921 浏览

python - 无法在 python shell 中导入 PySpark

我尝试在 $SPARK_HOME/bin/pyspark 文件中添加以下内容:

这是正确的方法吗?因为我无法导入它。我错过了什么吗?

0 投票
2 回答
2734 浏览

amazon-s3 - Spark : Data processing using Spark for large number of files says SocketException : Read timed out

I am running Spark in standalone mode on 2 machines which have these configs

  1. 500gb memory, 4 cores, 7.5 RAM
  2. 250gb memory, 8 cores, 15 RAM

I have created a master and a slave on 8core machine, giving 7 cores to worker. I have created another slave on 4core machine with 3 worker cores. The UI shows 13.7 and 6.5 G usable RAM for 8core and 4core respectively.

Now on this I have to process an aggregate of user ratings over a period of 15 days. I am trying to do this using Pyspark This data is stored in hourwise files in day-wise directories in an s3 bucket, every file must be around 100MB eg

s3://some_bucket/2015-04/2015-04-09/data_files_hour1

I am reading the files like this

where files is a string of this form 's3://some_bucket/2015-04/2015-04-09/*,s3://some_bucket/2015-04/2015-04-09/*'

Then I do a series of maps and filters and persist the result

Then I need to do a reduceByKey to get an aggregate score over the span of days.

Then I need to make a redis call for the actual terms for the items the user has rated, so I call mapPartitions like this

get_tags function creates a redis connection each time of invocation and calls redis and yield a (user, item, rate) tuple (The redis hash is stored in the 4core)

I have tweaked the settings for SparkConf to be at

I run the job with driver-memory of 2g in client mode, since cluster mode doesn't seem to be supported here. The above process takes a long time for 2 days' of data (around 2.5hours) and completely gives up on 14 days'.

What needs to improve here?

  1. Is this infrastructure insufficient in terms of RAM and cores (This is offline and can take hours, but it has got to finish in 5 hours or so)
  2. Should I increase/decrease the number of partitions?
  3. Redis could be slowing the system, but the number of keys is just too huge to make a one time call.
  4. I am not sure where the task is failing, in reading the files or in reducing.
  5. Should I not use Python given better Spark APIs in Scala, will that help with efficiency as well?

This is the exception trace

I could really use some help, thanks in advance

Here is what my main code looks like

def main(sc): f=get_files() a=sc.textFile(f, 15) .coalesce(7*sc.defaultParallelism) .map(lambda line: line.split(",")) .filter(len(line)>0) .map(lambda line: (line[18], line[2], line[13], line[15])).map(scoring) .map(lambda line: ((line[0], line[1]), line[2])).persist(StorageLevel.MEMORY_ONLY_SER) b=a.reduceByKey(lambda x, y: x+y).map(aggregate) b.persist(StorageLevel.MEMORY_ONLY_SER) c=taggings.mapPartitions(get_tags) c.saveAsTextFile("f") a.unpersist() b.unpersist()

The get_tags function is

The get_files function is as:

The get_path_from_dates(DAYS) is

0 投票
0 回答
808 浏览

python - Spark将功能应用于组

嘿,我有一个 ASV(chr(1) 分隔的配置单元数据文件)格式的表。我想提取某些列,按两列的组合分组,并在每个组中做一些事情。

我希望输出像

到目前为止,我在 pyspark 中所做的事情:

错误看起来像:

但它弹出如下错误消息,我不知道这是我的语法错误还是系统设置错误:

额外信息:

我们在 redhat 盒子上运行 CDH。如您所知,Redhat 使用 Python2.6 作为默认的 Python 版本。为了使用 iPythonnotebook,我在 namenode 上创建了与 Python2.6 兼容的 iPython 旧版本,并使用 virtualenv 启动了 iPythonnotebook...(有关我如何制作此香肠的更多信息,请单击此处

0 投票
5 回答
222900 浏览

apache-spark - 如何杀死正在运行的 Spark 应用程序?

我有一个正在运行的 Spark 应用程序,它占据了我的其他应用程序不会被分配任何资源的所有内核。

我做了一些快速研究,人们建议使用 YARN kill 或 /bin/spark-class 来终止命令。但是,我使用的是 CDH 版本,而 /bin/spark-class 甚至根本不存在,YARN kill 应用程序也不起作用。

在此处输入图像描述

有人可以和我一起吗?