27

如何从 PySpark 中的 RDD 中删除行?特别是第一行,因为它往往包含我的数据集中的列名。通过仔细阅读 API,我似乎找不到一种简单的方法来做到这一点。当然,我可以通过 Bash / HDFS 做到这一点,但我只想知道这是否可以在 PySpark 中完成。

4

6 回答 6

23

特定于 PySpark:

根据@maasg,您可以这样做:

header = rdd.first()
rdd.filter(lambda line: line != header)

但这在技术上并不正确,因为您可能会排除包含数据的行以及标题。但是,这似乎对我有用:

def remove_header(itr_index, itr):
    return iter(list(itr)[1:]) if itr_index == 0 else itr
rdd.mapPartitionsWithIndex(remove_header)

相似地:

rdd.zipWithIndex().filter(lambda tup: tup[1] > 0).map(lambda tup: tup[0])

我是 Spark 的新手,因此无法明智地评论哪个最快。

于 2015-08-03T23:17:58.790 回答
20

AFAIK 没有“简单”的方法可以做到这一点。

不过,这应该可以解决问题:

val header = data.first
val rows = data.filter(line => line != header)
于 2014-07-14T10:35:43.920 回答
5

假设您使用的是 Python 3,在 PySpark (Python API) 中实现此目的的一种简单方法:

noHeaderRDD = rawRDD.zipWithIndex().filter(lambda row_index: row_index[1] > 0).keys()
于 2015-08-12T08:35:10.830 回答
3

我对各种解决方案进行了一些分析,并具有以下内容

集群配置

集群

  • 集群 1:4 核 16 GB
  • 集群 2:4 核 16 GB
  • 集群 3:4 核 16 GB
  • 集群 4:2 核 8 GB

数据

700 万行,4 列

#Solution 1
# Time Taken : 40 ms
data=sc.TextFile('file1.txt')
firstRow=data.first()
data=data.filter(lambda row:row != firstRow)

#Solution 2
# Time Taken : 3 seconds
data=sc.TextFile('file1.txt')
def dropFirstRow(index,iterator):
     return iter(list(iterator)[1:]) if index==0 else iterator
data=data.mapPartitionsWithIndex(dropFirstRow)

#Solution 3
# Time Taken : 0.3 seconds
data=sc.TextFile('file1.txt')
def dropFirstRow(index,iterator):
     if(index==0):
          for subIndex,item in enumerate(iterator):
               if subIndex > 0:
                    yield item
     else:
          yield iterator

data=data.mapPartitionsWithIndex(dropFirstRow)

我认为解决方案 3 是最具可扩展性的

于 2018-04-27T17:17:59.720 回答
1

就我个人而言,我认为仅使用过滤器来摆脱这些东西是最简单的方法。但是根据您的评论,我有另一种方法。Glom RDD,所以每个分区都是一个数组(我假设每个分区有一个文件,每个文件的顶部都有违规行),然后跳过第一个元素(这是使用 scala api)。

data.glom().map(x => for (elem <- x.drop(1){/*do stuff*/}) //x is an array so just skip the 0th index

请记住,RDD 的一大特点是它们是不可变的,因此自然地删除一行是一件棘手的事情

更新: 更好的解决方案。
rdd.mapPartions(x => for (elem <- x.drop(1){/*do stuff*/} )
与 glom 相同,但没有将所有内容放入数组的开销,因为在这种情况下 x 是一个迭代器

于 2014-07-14T12:53:59.730 回答
1

我用spark2.1测试过。假设您想删除前 14 行而不知道文件的列数。

sc = spark.sparkContext
lines = sc.textFile("s3://location_of_csv")
parts = lines.map(lambda l: l.split(","))
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])

withColumn 是一个 df 函数。因此,在上述情况下,以下将不适用于 RDD 样式。

parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)
于 2017-10-14T22:14:09.210 回答