26

我有一个 csv 文件;我在pyspark中转换为DataFrame(df);经过一些改造;我想在df中添加一列;这应该是简单的行 id(从 0 或 1 到 N)。

我在rdd中转换了df并使用“zipwithindex”。我将生成的 rdd 转换回 df。这种方法有效,但它生成了 250k 个任务并且需要大量执行时间。我想知道是否有其他方法可以减少运行时间。

以下是我的代码片段;我正在处理的 csv 文件很大;包含数十亿行。

debug_csv_rdd = (sc.textFile("debug.csv")
  .filter(lambda x: x.find('header') == -1)
  .map(lambda x : x.replace("NULL","0")).map(lambda p: p.split(','))
  .map(lambda x:Row(c1=int(x[0]),c2=int(x[1]),c3=int(x[2]),c4=int(x[3]))))

debug_csv_df = sqlContext.createDataFrame(debug_csv_rdd)
debug_csv_df.registerTempTable("debug_csv_table")
sqlContext.cacheTable("debug_csv_table")

r0 = sqlContext.sql("SELECT c2 FROM debug_csv_table WHERE c1 = 'str'")
r0.registerTempTable("r0_table")

r0_1 = (r0.flatMap(lambda x:x)
    .zipWithIndex()
    .map(lambda x: Row(c1=x[0],id=int(x[1]))))

r0_df=sqlContext.createDataFrame(r0_2)
r0_df.show(10) 
4

1 回答 1

79

您还可以使用 sql 包中的函数。它将生成一个唯一的 id,但它不会是连续的,因为它取决于分区的数量。我相信它在 Spark 1.5 + 中可用

from pyspark.sql.functions import monotonicallyIncreasingId

# This will return a new DF with all the columns + id
res = df.withColumn("id", monotonicallyIncreasingId())

编辑:2017 年 19 月 1 日

正如@Sean评论的那样

monotonically_increasing_id()从 Spark 1.6 及更高版本开始使用

于 2016-03-11T19:31:17.930 回答