问题标签 [pyspark-sql]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1499 浏览

python - 如何按多值列过滤 JSON 数据

在 Spark SQL 的帮助下,我试图过滤掉属于特定组类别的所有业务项目。

数据从 JSON 文件加载:

该文件的架构如下:

我正在尝试提取与餐厅业务相关的所有业务:

但它不起作用,因为据我了解,列的预期类型应该是字符串,但在我的情况下,这是数组。关于它告诉我一个例外:

你能建议任何其他方式来获得我想要的东西吗?

0 投票
1 回答
4329 浏览

python - 从 Spark DataFrame 中选择空数组值

给定具有以下行的 DataFrame:

我想为每个col2,col3col4(即第 3 行)删除带有空数组的行。

例如,我可能希望这段代码能够工作:

我有两个问题

  1. 如何将 where 子句与and但更重要的是...
  2. 如何判断数组是否为空。

那么,是否有一个内置函数来查询空数组?有没有一种优雅的方式将一个空数组强制为一个nanull值?

我试图避免使用 python 来解决它,无论是使用 UDF 还是.map().

0 投票
2 回答
83100 浏览

python - PySpark:使用过滤器功能后取一列的平均值

我正在使用以下代码来获取薪水大于某个阈值的人的平均年龄。

列年龄是数字(浮点数),但我仍然收到此错误。

groupBy您是否知道在不使用函数和 SQL 查询的情况下获取 avg 等的其他方法。

0 投票
2 回答
3667 浏览

apache-spark - 在通过 JDBC 从 pyspark 数据帧插入外部数据库表时进行重复键更新

好吧,我正在使用 PySpark,并且我有一个 Spark 数据框,我使用它将数据插入到 mysql 表中。

url = "jdbc:mysql://hostname/myDB?user=xyz&password=pwd"

df.write.jdbc(url=url, table="myTable", mode="append")

我想通过列值和特定数字的总和来更新列值(不在主键中)。

我尝试过使用不同的模式(追加、覆盖)DataFrameWriter.jdbc() 函数。

我的问题是我们如何像ON DUPLICATE KEY UPDATE在 mysql 中那样更新列值,同时将 pyspark 数据帧数据插入表中。

0 投票
1 回答
2495 浏览

apache-spark - 我可以在常规 Spark 地图操作中使用 Spark DataFrame 吗?

我尝试在 Spark DataFrame 之前从常规 Spark 映射操作中使用定义,如下所示:

我有一个巨大的错误信息:

我进行了一些调查以了解究竟是哪一行导致了这个错误,我发现得到这个错误的最少代码是:

因此,我得出结论,我使用了某种错误的 DataFrame,但从 Spark 文档中究竟是什么并不清楚。我怀疑reviewsDF 应该分布在集群中的所有机器上,但我想因为我是使用SqlContext 创建的,所以它应该已经在Spark 上下文中。

先感谢您。

0 投票
1 回答
1441 浏览

apache-spark-sql - 通过 JDBC 驱动将 Spark 连接到 HAWQ

尝试从 Spark 连接到 HAWQ,使用 greenplum 的 odbc/jdbc 驱动程序(从适当的 Pivotal 页面下载)。

使用 Spark 1.4,这是用 python 编写的示例代码:(所有大写字母都有适当的变量分配)...

...

Spark submit 命令将 odbc 驱动程序附加到类路径。我已经使用基本的 sqlContext 实例化完成了一个“hello world”,并且在集群上一切运行良好。但是当我尝试实际连接到 HAWQ postgresql db 时,它不会运行。

错误:

有什么想法或建议吗?我已经尝试了至少 20 种“df = sqlContext.read.load ...”定义的组合,但无济于事。

0 投票
1 回答
724 浏览

anaconda - pyspark - 错误仅出现在 IPython 中,但不在 vanila python 中

如果我通过在控制台中键入来启动 pyspark /usr/bin/pyspark,则以下示例代码运行时不会出现任何错误。但是,如果我将它与 IPython 一起使用,则可以通过调用

或通过

然后引发异常。

这是代码:

这是错误消息:

是什么导致了这个错误,我该如何解决?

更新

事实证明,如果我为 Linux 使用 Anaconda python 发行版,则存在问题:

但是,如果我禁用 anaconda 发行版并使用系统附带的 Python,一切正常

所以,问题出在Anaconda,但仍然不知道问题是什么

0 投票
2 回答
8113 浏览

apache-spark - Performing lookup/translation in a Spark RDD or data frame using another RDD/df

I'm having a hard time implementing something that seems like it should be very easy:

My goal is to make translations in an RDD/dataframe using a second RDD/dataframe as a lookup table or translation dictionary. I want to make these translations in multiple columns.

The easiest way to explain the problem is by example. Let's say I have as my input the following two RDDs:

and

My desired output RDD is:

How should I go about it producing it?

This is an easy problem in SQL, but I don't know of obvious solutions with RDDs in Spark. The join, cogroup, etc methods seem to not be well-suited to multi-column RDDs and don't allow specifying which column to join on.

Any ideas? Is SQLContext the answer?

0 投票
2 回答
3755 浏览

apache-spark - Spark RDD groupByKey + join vs join 性能

我在与其他用户共享的集群上使用 Spark。因此,仅根据运行时间来判断我的哪个代码运行效率更高是不可靠的。因为当我运行更高效的代码时,其他人可能会运行大量数据并使我的代码执行更长时间。

所以我可以在这里问两个问题:

  1. 我正在使用join函数加入 2RDDs并且在使用groupByKey()之前尝试使用join,如下所示:

    似乎花了更长的时间,但是我记得当我使用 Hadoop Hive 时,group by 使我的查询运行得更快。由于 Spark 使用延迟评估,我想知道groupByKeybefore是否join让事情变得更快

  2. 注意到Spark有一个SQL模块,到现在还真没时间去尝试,请问SQL模块和RDD SQL之类的函数有什么区别?

0 投票
1 回答
27540 浏览

python - 将 DataFrame 中的新派生列从布尔值转换为整数

假设我有一个x具有此架构的 DataFrame:

然后我有DataFrame:

我想要一个整数派生列。我能够创建一个布尔列:

我的新架构是:

但是,我希望列y包含 0 代表 False 和 1 代表 True。

cast函数只能对列进行操作,而不能对 a 进行操作,DataFrame并且该withColumn函数只能对 a 进行操作DataFrame。如何添加新列并同时将其转换为整数?