问题标签 [pyspark-sql]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 如何按多值列过滤 JSON 数据
在 Spark SQL 的帮助下,我试图过滤掉属于特定组类别的所有业务项目。
数据从 JSON 文件加载:
该文件的架构如下:
我正在尝试提取与餐厅业务相关的所有业务:
但它不起作用,因为据我了解,列的预期类型应该是字符串,但在我的情况下,这是数组。关于它告诉我一个例外:
你能建议任何其他方式来获得我想要的东西吗?
python - 从 Spark DataFrame 中选择空数组值
给定具有以下行的 DataFrame:
我想为每个col2
,col3
和col4
(即第 3 行)删除带有空数组的行。
例如,我可能希望这段代码能够工作:
我有两个问题
- 如何将 where 子句与
and
但更重要的是... - 如何判断数组是否为空。
那么,是否有一个内置函数来查询空数组?有没有一种优雅的方式将一个空数组强制为一个na
或null
值?
我试图避免使用 python 来解决它,无论是使用 UDF 还是.map()
.
python - PySpark:使用过滤器功能后取一列的平均值
我正在使用以下代码来获取薪水大于某个阈值的人的平均年龄。
列年龄是数字(浮点数),但我仍然收到此错误。
groupBy
您是否知道在不使用函数和 SQL 查询的情况下获取 avg 等的其他方法。
apache-spark - 在通过 JDBC 从 pyspark 数据帧插入外部数据库表时进行重复键更新
好吧,我正在使用 PySpark,并且我有一个 Spark 数据框,我使用它将数据插入到 mysql 表中。
url = "jdbc:mysql://hostname/myDB?user=xyz&password=pwd"
df.write.jdbc(url=url, table="myTable", mode="append")
我想通过列值和特定数字的总和来更新列值(不在主键中)。
我尝试过使用不同的模式(追加、覆盖)DataFrameWriter.jdbc() 函数。
我的问题是我们如何像ON DUPLICATE KEY UPDATE
在 mysql 中那样更新列值,同时将 pyspark 数据帧数据插入表中。
apache-spark - 我可以在常规 Spark 地图操作中使用 Spark DataFrame 吗?
我尝试在 Spark DataFrame 之前从常规 Spark 映射操作中使用定义,如下所示:
我有一个巨大的错误信息:
我进行了一些调查以了解究竟是哪一行导致了这个错误,我发现得到这个错误的最少代码是:
因此,我得出结论,我使用了某种错误的 DataFrame,但从 Spark 文档中究竟是什么并不清楚。我怀疑reviewsDF 应该分布在集群中的所有机器上,但我想因为我是使用SqlContext 创建的,所以它应该已经在Spark 上下文中。
先感谢您。
apache-spark-sql - 通过 JDBC 驱动将 Spark 连接到 HAWQ
尝试从 Spark 连接到 HAWQ,使用 greenplum 的 odbc/jdbc 驱动程序(从适当的 Pivotal 页面下载)。
使用 Spark 1.4,这是用 python 编写的示例代码:(所有大写字母都有适当的变量分配)...
...
Spark submit 命令将 odbc 驱动程序附加到类路径。我已经使用基本的 sqlContext 实例化完成了一个“hello world”,并且在集群上一切运行良好。但是当我尝试实际连接到 HAWQ postgresql db 时,它不会运行。
错误:
有什么想法或建议吗?我已经尝试了至少 20 种“df = sqlContext.read.load ...”定义的组合,但无济于事。
anaconda - pyspark - 错误仅出现在 IPython 中,但不在 vanila python 中
如果我通过在控制台中键入来启动 pyspark /usr/bin/pyspark
,则以下示例代码运行时不会出现任何错误。但是,如果我将它与 IPython 一起使用,则可以通过调用
或通过
然后引发异常。
这是代码:
这是错误消息:
是什么导致了这个错误,我该如何解决?
更新
事实证明,如果我为 Linux 使用 Anaconda python 发行版,则存在问题:
但是,如果我禁用 anaconda 发行版并使用系统附带的 Python,一切正常
所以,问题出在Anaconda,但仍然不知道问题是什么
apache-spark - Performing lookup/translation in a Spark RDD or data frame using another RDD/df
I'm having a hard time implementing something that seems like it should be very easy:
My goal is to make translations in an RDD/dataframe using a second RDD/dataframe as a lookup table or translation dictionary. I want to make these translations in multiple columns.
The easiest way to explain the problem is by example. Let's say I have as my input the following two RDDs:
and
My desired output RDD is:
How should I go about it producing it?
This is an easy problem in SQL, but I don't know of obvious solutions with RDDs in Spark. The join, cogroup, etc methods seem to not be well-suited to multi-column RDDs and don't allow specifying which column to join on.
Any ideas? Is SQLContext the answer?
apache-spark - Spark RDD groupByKey + join vs join 性能
我在与其他用户共享的集群上使用 Spark。因此,仅根据运行时间来判断我的哪个代码运行效率更高是不可靠的。因为当我运行更高效的代码时,其他人可能会运行大量数据并使我的代码执行更长时间。
所以我可以在这里问两个问题:
我正在使用
join
函数加入 2RDDs
并且在使用groupByKey()
之前尝试使用join
,如下所示:似乎花了更长的时间,但是我记得当我使用 Hadoop Hive 时,group by 使我的查询运行得更快。由于 Spark 使用延迟评估,我想知道
groupByKey
before是否join
让事情变得更快注意到Spark有一个SQL模块,到现在还真没时间去尝试,请问SQL模块和RDD SQL之类的函数有什么区别?
python - 将 DataFrame 中的新派生列从布尔值转换为整数
假设我有一个x
具有此架构的 DataFrame:
然后我有DataFrame:
我想要一个整数派生列。我能够创建一个布尔列:
我的新架构是:
但是,我希望列y
包含 0 代表 False 和 1 代表 True。
该cast
函数只能对列进行操作,而不能对 a 进行操作,DataFrame
并且该withColumn
函数只能对 a 进行操作DataFrame
。如何添加新列并同时将其转换为整数?