这个问题是 [this one] 的衍生问题(将行列表保存到 pyspark 中的 Hive 表)。
编辑请在这篇文章的底部查看我的更新编辑
我已经使用 Scala 和现在的 Pyspark 来完成相同的任务,但是我遇到了将数据帧保存到镶木地板或 csv 或将数据帧转换为列表或数组类型数据结构的速度非常慢的问题。以下是相关的 python/pyspark 代码和信息:
#Table is a List of Rows from small Hive table I loaded using
#query = "SELECT * FROM Table"
#Table = sqlContext.sql(query).collect()
for i in range(len(Table)):
rows = sqlContext.sql(qry)
val1 = Table[i][0]
val2 = Table[i][1]
count = Table[i][2]
x = 100 - count
#hivetemp is a table that I copied from Hive to my hfs using:
#create external table IF NOT EXISTS hive temp LIKE hivetableIwant2copy LOCATION "/user/name/hiveBackup";
#INSERT OVERWRITE TABLE hivetemp SELECT * FROM hivetableIwant2copy;
query = "SELECT * FROM hivetemp WHERE col1<>\""+val1+"\" AND col2 ==\""+val2+"\" ORDER BY RAND() LIMIT "+str(x)
rows = sqlContext.sql(query)
rows = rows.withColumn("col4", lit(10))
rows = rows.withColumn("col5", lit(some_string))
#writing to parquet is heck slow AND I can't work with pandas due to the library not installed on the server
rows.saveAsParquetFile("rows"+str(i)+".parquet")
#tried this before and heck slow also
#rows_list = rows.collect()
#shuffle(rows_list)
我曾尝试在 Scala 中执行上述操作,并且遇到了类似的问题。我可以轻松加载配置单元表或查询配置单元表,但需要进行随机洗牌或存储大型数据帧会遇到内存问题。能够添加 2 个额外的列也存在一些挑战。
我要添加行的 Hive 表 (hiveTemp) 有 5,570,000 ~550 万行和 120 列。
我在 for 循环中迭代的 Hive 表有 5000 行和 3 列。有 25 个唯一的val1
(hiveTemp 中的一列),val1
以及val2
3000 的组合。Val2 可以是 5 列之一及其特定的单元格值。这意味着如果我调整了代码,那么我可以将行的查找次数从 5000 减少到 26,但是我必须检索、存储和随机洗牌的行数会非常大,因此会出现内存问题(除非有人对此有建议)
至于我需要添加到表中的总行数可能约为 100,000。
最终目标是让 5.5mill 行的原始表附加 100k+ 行写成 hive 或 parquet 表。如果它更容易,我可以在它自己的表中编写 100k 行,以后可以合并到 5.5 mill 表中
Scala 或 Python 很好,尽管 Scala 更受欢迎。
对此的任何建议以及最好的选择都会很棒。
非常感谢!
编辑我对这个问题的一些额外想法:我使用哈希分区器将配置单元表分区为 26 个分区。这是基于有 26 个不同值的列值。我想在 for 循环中执行的操作可以泛化,以便它只需要在这些分区中的每一个上发生。话虽如此,我怎么能,或者我可以在网上查看什么指南,以便能够编写 scala 代码来执行此操作,并让一个单独的执行器在每个分区上执行这些循环中的每一个?我认为这会使事情变得更快。
我知道如何使用多线程来做这样的事情,但不确定如何在 scala/spark 范式中使用。