问题标签 [pyspark-sql]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
7762 浏览

pyspark - 从 spark api 中的字符串列(日期时间戳)中提取星期几

我是 Spark API 的新手。我正在尝试从 col_date 列中提取工作日编号(具有日期时间戳,例如'13AUG15:09:40:15'),该列是字符串,并添加另一列作为工作日(整数)。我无法成功。

0 投票
1 回答
15566 浏览

python - 将行列表保存到 pyspark 中的 Hive 表

我有一个 pyspark 应用程序。我将一个 hive 表复制到我的 hdfs 目录中,并在 pythonsqlContext.sql中对该表进行了查询。现在这个变量是我调用的数据框rows。我需要随机洗牌rows,所以我不得不将它们转换为行列表rows_list = rows.collect()。然后我shuffle(rows_list)将列表重新排列到位。我取所需的随机行数x

for r in range(x): allrows2add.append(rows_list[r]) 现在我想将 allrows2add 保存为 hive 表或附加现有的 hive 表(以更容易做的为准)。问题是我不能这样做:

all_df = sc.parallelize(allrows2add).toDF()不能这样做,无法推断架构 ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling

没有放入整个架构。的架构rows有 117 列,所以我不想输入它们。有没有办法提取架构rows来帮助我制作 allrows2add 数据框或以某种方式保存为配置单元表?我可以做 rows.printSchema(),但不确定如何将其转换为模式格式作为变量传递toDF(),而无需解析所有文本

谢谢

添加for循环信息

0 投票
2 回答
73 浏览

sql - Spark SQL:列值只能是 A、T、G、C 或 N 的组合

我正在尝试查询 spark 表以查找“ref”列中包含不是 A、T、G、C 或 N 的字母的所有行。

一个有效的结果应该只包含那些字母,并且可以包含这些字母的任意长度或组合。

例如:

有效 = AA、ATTTGGGGCCCC、C、G、TTG、N 等。

无效 = P, ., NULL

以下查询仅返回包含单核苷酸的列:

以下查询在 impala sql 中有效,但在 spark 中无效,而且非常难看:

0 投票
1 回答
11842 浏览

python - PySpark——将行列表转换为数据框

实际上要解决的问题是获取 PySpark 数据帧的第一行/最后 N 行,并将结果作为数据帧。具体来说,我希望能够做这样的事情:

但是,因为head()返回行列表,我收到此错误:

因此,我正在寻找将 PySpark 数据帧的前 N ​​行作为数据帧返回的方法,或者将这些行列表转换为数据帧的方法。有任何想法吗?

0 投票
3 回答
21071 浏览

python-2.7 - 如何在 python 中使用 Spark Data frame 和 GroupBy 派生 Percentile

我有一个 Spark 数据框,其中包含Date,GroupPrice列。

我正在尝试在 Python 中导出该数据框percentile(0.6)的列。Price此外,我需要将输出添加为新列。

我尝试了下面的代码:

但它抛出以下错误:

0 投票
1 回答
877 浏览

apache-spark - 分组求和后的RDD排序

我正在尝试对一些 yelp 数据进行一些分析。数据结构如下:

我想计算每个州的记录,包括 10 条或更多评论的整体,这些评论目前处于打开状态,并找到计数第三高的州。首先我做了

这给出了这个

现在将其存储到 summedDF 之后,

summedDF.sort(summedDF.state.desc()).collect()

按状态排序就好了,但是(不出所料)

summedDF.sort(summedDF.SUM(review_count#16).desc()).collect()

不起作用。实际上,它甚至没有运行。我有正确数量的括号,但不是执行,而是与之前一起进入下一行...,等待新的输入。

我该如何进行这种排序,以及不执行的情况如何?#16 怎么了?

0 投票
2 回答
3743 浏览

apache-spark - pyspark:将 DataFrame 的行组合成 DenseVector

我有DataFrame两列:

编辑 2017/01/13:我从基于实体-属性-值模型的 SQL 表中派生出这个数据框。因此,每一行都可以使用额外的第三个实体列“id”。

我想将其转换为包DataFrame分类器所需的“功能” ml。对于单个列,这可以使用以下方法实现VectorAssembler

我想要的是这样的:

根据column 的值将 column 的值组合value成 a的最有效方法是什么?DenseVectorname

例如,我正在考虑适用于GroupedData的自定义聚合函数groupby

类似于 PostgreSQL array_agg函数:

0 投票
1 回答
5853 浏览

python - PySpark DataFrame 无法删除重复项

您好,我创建了一个 spark 数据框,我正在尝试删除重复项:

我收到以下错误:

我正在使用 osx 10.11.4,火花 1.6.1

我像这样运行了一个jupyter笔记本

还有其他一些我可能错过或出错的配置吗?

0 投票
0 回答
218 浏览

apache-spark - Pyspark (1.6.1) SQL.dataframe 列到没有 Hive 的向量聚合

假设我的 SQL 数据框df是这样的:

我希望输出为:

在没有 Hive 的情况下使用 SQL 数据框执行此操作的最简单方法是什么?

1) 显然,有了 Hive 支持,人们可以简单地使用collect_set()collect_list()聚合函数。但是这些函数在普通的 Spark SqlContext 中不起作用。

2) 另一种方法是制作 UDAF,但考虑到所需的代码量,对于如此简单的聚合来说,这似乎有点过头了。

3)我可以使用 df.rdd 然后使用该groupBy()功能。这是我最后的手段。我实际上将 RDD 转换为 DF 以使数据操作更容易,但显然不是......

还有其他我错过的简单方法吗?

0 投票
3 回答
622 浏览

apache-spark - APACHE SPARK:是否可以生成具有相似大小的零件文件?

我是 SPARK 的新手。我正在运行在 hdfs 位置生成 6000 个零件文件的 SPARK-SQL 代码。在这 6000 个部分文件中,大约 1500 个文件的大小约为 100 MB,而其他文件的大小要小得多,小于 100 MB(有些约为 30MB,大多数以 kb 为单位)。我觉得这是不平衡的大小分布使我的代码变慢,并且由于这些执行程序的高负载,一些执行程序正在丢失。有什么办法可以平衡 executor 的负载,避免 executor 丢失?