问题标签 [pyspark-sql]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
444 浏览

pyspark - Python Spark 将事务分组到嵌套模式中

我想将存储在pyspark.sql.dataframe.DataFrameddf”中的交易按列“ key”分组,该列表示交易的来源(在本例中为客户 ID)。

分组是一个相当昂贵的过程,所以我想以嵌套模式将组写入磁盘:

这将允许我快速加载密钥上的所有事务,并开发复杂的自定义聚合器,而无需重新运行分组。

如何创建嵌套模式并将其写入磁盘?

0 投票
2 回答
2356 浏览

group-by - 具有用户定义函数的 Pyspark 数据帧聚合

如何将'groupby(key).agg('与用户定义的函数一起使用?具体来说,我需要每个键的所有唯一值列表[不计数]。

0 投票
0 回答
492 浏览

apache-spark - 将csv文件导入火花数据框

我正在尝试使用 pyspark 导入 csv 文件。我试过这个这个

使用第一种方法,我可以读取 csv 文件。但是变量的数量非常大。所以手动提及变量名是很困难的。

使用第二种方法(spark-csv),我可以使用命令提示符读取 csv 文件。但是当我尝试在 Jupyter 笔记本中使用相同的方法时,我收到了错误:

我也厌倦了这个选项。我修复了“conf”文件。但不知道如何在 windows 环境下设置“PACKAGES”和“PYSPARK_SUBMIT_ARGS”。

任何人都可以帮助我如何在 spark 数据框中读取 csv 文件吗?

谢谢!

0 投票
0 回答
285 浏览

apache-spark-sql - 如何配置 ParquetFileReader 并行度?

我正在尝试将分区的 spark sql 数据帧(大约 300 个分区)保存到 hdfs,它真的很慢:

join_table.write.mode('overwrite').partitionBy(target).save(path_out)

我认为这可能是由于:

INFO ParquetFileReader:以并行方式启动操作:5

关于如何配置这种并行性的任何想法?此链接可能会有所帮助:https : //forums.databricks.com/questions/1097/stall-on-loading-many-parquet-files-on-s3.html 但我不知道如何在 Pyspark 中使用 newAPIHadoopFile。

0 投票
1 回答
13186 浏览

datetime - 从 Pyspark 中包含时间戳的字符串列中提取日期

我有一个数据框,其日期格式如下:

我打算以格式从中提取日期YYYY-MM-DD;所以结果应该是上述日期 - 2016-05-06。

但是当我提取时使用以下内容:

我得到以下日期

有人可以就此提出建议吗?我不打算将我的 df 转换为 rdd 以使用 python 中的 datetime 函数,并希望在它自己的数据框中使用它。

0 投票
2 回答
3207 浏览

loops - 在脚本中迭代/循环 Spark parquet 文件会导致内存错误/堆积(使用 Spark SQL 查询)

我一直在试图弄清楚当我遍历镶木地板文件和几个后处理功能时,如何防止 Spark 由于内存问题而崩溃。对大量文本感到抱歉,但这并不完全是一个特定的错误(我正在使用 PySpark。)如果这破坏了正确的 Stack Overflow 形式,我们深表歉意!

基本伪代码为:

此代码使用 spark SQL 查询,因此我未能成功创建包含所有 SQL 查询/函数的包装函数并将其传递给 foreach(它不能将 sparkContext 或 sqlQuery 作为输入)而不是标准环形。

从技术上讲,这是一个有分区的大型 parquet 文件,但一次读取并查询它的规模太大了;我需要在每个分区上运行这些功能。所以我只是在 PySpark 中运行一个常规的 python 循环,在每个循环中,我处理一个 parquet 分区(子目录)并编写相关的输出报告。

由于整个镶木地板文件的大小,不确定是否将所有代码包装在一个大 mapPartition() 周围是否可行?

但是在几个循环之后,脚本由于内存错误而崩溃 - 特别是 Java 堆错误。(我已经确认循环崩溃的文件没有什么特别之处;它发生在第二个或第三个循环中读取的任何随机文件中。)

我意识到 Spark 并不意味着要在循环中运行,但是这些 SQL 查询对于标准的 Spark SQL 打包函数来说有点太复杂了,我们为每个文件编写了多个关于不同聚合统计信息的汇总报告。

有没有办法在每个循环索引结束时基本上清除内存?使用 sqlContext.dropTempTable() 删除任何已注册的临时表并使用 sqlContext.clearCache() 清除缓存都没有帮助。如果我尝试停止 sparkContext 并在每个循环中重新启动它,我也会收到错误,因为某些进程尚未“结束”(看起来您曾经能够“优雅地”停止上下文,但我在当前的 PySpark 文档中找不到这个。)

我还应该注意,在完成处理后,我没有在循环中的数据帧上调用 unpersist(),但我也没有在它们上调用 persist();我只是在每个循环中重写数据帧(这可能是问题的一部分)。

我正在与我们的工程团队一起调整内存设置,但我们知道我们已经分配了足够多的内存来完成这个脚本的一个循环(并且一个循环确实运行没有任何错误)。

任何建议都会有所帮助 - 包括可能比 Spark 更适合此用例的工具。我使用的是 Spark 1.6.1 版。

0 投票
1 回答
806 浏览

apache-spark - 通过pyspark读取csv文件,列空白

我有一个 csv 文件,看起来像这样

当我将这些数据读入 Spark 时,它会将 C 列视为“字符串”,因为前几行中有“空白”。

谁能告诉我如何在 SQL 数据框中加载这个文件,使 c 列保持整数(或浮点数)?

我正在使用“ sc.textFile”将数据读入 spark,然后将其转换为 SQL 数据帧。

我读了这个这个链接。但他们对我帮助不大。

我的代码部分。在代码的最后一行我得到了错误。

谢谢!

0 投票
1 回答
9237 浏览

pyspark-sql - 火花多条件加入

我正在使用 spark sql 加入三个表,但是我得到了多个列条件的错误。

错误:

我没有指定“and”,而是尝试放置“&”和“&&”,但这些都不起作用。任何帮助,将不胜感激。

0 投票
1 回答
965 浏览

python - 使用 toPandas() 方法将 spark 数据帧转换为 Pandas 数据帧时会发生什么

我有一个 spark 数据框,我可以使用

pyspark 中可用的方法。

我对此有以下疑问?

  1. 这种转换是否破坏了使用 spark 本身(分布式计算)的目的?
  2. 数据集将是巨大的,那么速度和内存问题呢?
  3. 如果有人也可以解释,这行代码到底发生了什么,那真的很有帮助。

谢谢

0 投票
1 回答
1285 浏览

python - 将 RDD 转换为列联表:Pyspark

目前,我正在尝试将 RDD 转换为列联表以使用该pyspark.ml.clustering.KMeans模块,该模块将数据帧作为输入。

当我这样做myrdd.take(K)时(其中 K 是某个数字),结构如下所示:

[[u'user1',('itm1',3),...,('itm2',1)], [u'user2',('itm1',7),..., ('itm2' ,4)],...,[u'usern',('itm2',2),...,('itm3',10)]]

其中每个列表包含一个实体作为第一个元素,以及该实体以元组形式喜欢的所有项目及其计数的集合。

现在,我的目标是将上述内容转换为DataFrame类似于以下列联表的火花。

我使用df.stat.crosstab了以下链接中引用的方法:

Apache Spark 中使用 DataFrames 的统计和数学函数 - 4. 交叉制表(列联表)

它几乎接近我想要的。

但是,如果在上面的元组中还有一个计数字段,即('itm1',3)如何将这个值3合并(或添加)到列联表(或实体项矩阵)的最终结果中。

当然,我通过将上述列表RDD转换为矩阵并将它们写入 csv 文件,然后作为DataFrame.

有没有更简单的方法来使用 DataFrame ?