问题标签 [pyspark-sql]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
955 浏览

apache-spark-sql - 如何从 mysql 和 Oracle 中加入 SparkSQL 数据?

在 SparkSQL 中是否可以连接来自 mysql 和 Oracle 数据库的数据?我尝试加入他们,但在 SPARK_CLASSPATH 中设置多个 jar(用于 mysql 和 Oracle 的 jdbc 驱动程序)时遇到了一些麻烦。这是我的代码:

有人可以帮我解决这个问题吗?提前致谢 :)

0 投票
3 回答
34419 浏览

sql - 将数据导入Spark时如何设置分区/节点数

问题:我想使用以下命令将数据从 S3 导入 Spark EMR:

有没有办法可以设置 Spark 用来加载处理数据的节点数?这是我如何处理数据的示例:

上下文:数据不是太大,需要很长时间才能加载到 Spark 中并从中查询。我认为 Spark 将数据划分为太多节点。我希望能够手动设置。我知道在处理 RDD 时,sc.parallelize我可以将分区数作为输入传递。另外,我见过repartition(),但我不确定它是否能解决我的问题。在我的示例中,变量data是 a 。DataFrame

让我更准确地定义分区。定义一:通常称为“分区键”,其中选择并索引列以加快查询速度(这不是我想要的)。定义二:(这是我关心的地方)假设你有一个数据集,Spark 决定将它分布在许多节点上,以便它可以并行地对数据运行操作。如果数据大小太小,这可能会进一步减慢该过程。我如何设置该值

0 投票
1 回答
584 浏览

apache-spark - 对窗口函数求平均会导致 StackOverflowError

我试图通过使用窗口函数来确定 Dataframe 列中日期之间的平均时间跨度。然而,物化 Dataframe 会引发 Java 异常。

考虑以下示例:

虽然df.select(diff.alias('diff')).show()正确呈现为

df.select(avg_diff).show()给出了一个java.lang.StackOverflowError

我认为这应该有效吗?如果是这样,我做错了什么,我能做些什么呢?

我在 Spark 1.6 上使用 Python API

当我做df2 = df.select(diff.alias('diff'))然后做

没有错误。不幸的是,这不是我当前设置的选项。

0 投票
3 回答
56765 浏览

python - 文本列上的 Pyspark DataFrame UDF

我正在尝试对 PySpark DataFrame 中的一些 Unicode 列进行一些 NLP 文本清理。我在 Spark 1.3、1.5 和 1.6 中进行了尝试,但似乎无法让事情为我的生活工作。我也尝试过使用 Python 2.7 和 Python 3.4。

我创建了一个非常简单的 udf,如下所示,它应该只为新列中的每条记录返回一个字符串。其他函数将处理文本,然后将更改的文本返回到新列中。

一些示例数据可以从这里解压缩。

这是我用来导入数据然后应用 udf 的代码。

当我运行 df.show(5) 时,出现以下错误。我知道问题很可能不是源于 show() 但跟踪并没有给我太多帮助。

我正在尝试的实际功能:

0 投票
3 回答
2978 浏览

pyspark - PySpark - 读取递归 Hive 表

我有一个 Hive 表,它在 HDFS 中有多个子目录,例如:

通常我在运行 Hive 脚本之前设置以下参数:

我正在尝试使用 PySpark 做同样的事情,

并最终出现如下错误:

在 Spark 中读取带有子目录的 Hive 表的正确方法是什么?

0 投票
0 回答
3252 浏览

apache-spark - 带有 Apache Spark 的 Django

我有一个针对 MySQL 数据库运行的 Django 应用程序。我想通过将同一个应用程序连接到将数据存储为配置单元表的 Spark 来在应用程序中引入大数据分析。我可以通过直线运行查询并访问数据。我想从 Django 应用程序运行类似的查询并在应用程序中显示结果集。

我的环境是:

MySQL 上的Spark 1.5.2
Thriftserver 2
Hive 元存储

任何帮助将不胜感激。

谢谢

0 投票
2 回答
4049 浏览

pandas - Apache Spark - sqlContext.sql 到 pandas

嗨,我有一个 Spark DataFrame,我使用 SQL 上下文进行了一些转换,例如,在所有数据中只选择两列。

但现在我想将这个 sqlcontext 转换为 pandas 数据框,我正在使用

但是输出在这里停止,我需要重新启动 IDE(spyder)

我做错了什么?谢谢

编辑:更完整:我从 Oracle 数据库 (cx_Oracle) 加载日期并将数据放入 pandas 数据框中

接下来我创建了一个 sparkContext 来操作数据框

我想再次从 sqlcontext 转换为 pandas 数据框

0 投票
1 回答
2018 浏览

apache-spark - Spark 1.5.0 (PySpark) 案例当逻辑和滞后窗口函数

我正在尝试在 HiveContext 中使用 Spark SQL 函数“WHEN / OTHERWISE”以及窗口中的 LAG,为一些连续分钟数据中的升序数字计数字段创建 DIFF 字段,该字段经常重置为零。所以我需要更正“计数”重置为零。

所以我的代码如下:

我在 Pyspark 中的错误是:

尝试使用“sqlContext.sql("Select CASE WHEN...lag(num_count) OVER...") 会更好吗?

0 投票
1 回答
658 浏览

xml - 带有databricks xml lib的SparkSQL:有效xml上的“格式错误的行”/UnboundPrefix

假设我在 ipython notebook 会话中在 Oracle JDK 1.8(内部版本 1.8.0_65-b17)上运行 Spark 1.6.0,该会话从以下行开始:

所以我已经包含了 databricks spark-xml 包(https://github.com/databricks/spark-xml)。接下来我将对 pyspark 运行以下代码:

其中 dummy.xml 包含 DMOZ 转储的这个小片段(http://rdf.dmoz.org/):

这可以针对我能够找到的任何验证器进行验证。结果是:

它指的是这行代码:https ://github.com/databricks/spark-xml/blob/master/src/main/scala/com/databricks/spark/xml/util/InferSchema.scala#L101 。这显然是上面一些 javax.xml.stream 类引发的 XMLStreamException 的情况。

不幸的是,异常的细节被处理程序忽略了,所以我不知道该行到底出了什么问题。但是,从属性中删除名称空间(即r:id变为 just id)会使它消失。我觉得我遇到了一些常见的陷阱,只需要知道是哪一个。

UPD:我用调试语句编译了我自己的databricks lib jar,结果是,它是关于未绑定前缀的:

是什么原因,我该如何解决?

0 投票
1 回答
1950 浏览

apache-spark - py4j.protocol.Py4JJavaError when selecting nested column in dataframe using select statetment

I'm trying to perform a simple task in spark dataframe (python) which is create new dataframe by selecting specific column and nested columns from another dataframe for example :

and I get the following error:

I'm using spark-submit with master in local mode to run the this code. it important to mention the when I'm connecting to pyspark shell and run the code (line by line) it works , but when submitting it (even in local mode) it fails. another thing is important to mention is that when selecting a non nested field it works as well. I'm using spark 1.5.2 on EMR (version 4.2.0)