18

我在 HDFS 上有一个大的分布式文件,每次我使用带有 spark-csv 包的 sqlContext 时,它首先加载整个文件,这需要相当长的时间。

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path")

现在我有时只想快速检查一下,我只需要整个文件中的几行/任意 n 行。

df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").take(n)
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").head(n)

但所有这些都在文件加载完成后运行。我不能在读取文件本身时限制行数吗?我指的是 spark-csv 中与 pandas 等效的 n_rows,例如:

pd_df = pandas.read_csv("file_path", nrows=20)

或者可能是spark实际上并没有加载文件,第一步,但在这种情况下,为什么我的文件加载步骤花费了太多时间呢?

我想

df.count()

只给我n而不是所有行,有可能吗?

4

7 回答 7

19

您可以使用limit(n).

sqlContext.format('com.databricks.spark.csv') \
          .options(header='true', inferschema='true').load("file_path").limit(20)

这将只加载 20 行。

于 2017-05-31T06:26:45.193 回答
12

我的理解是 spark-csv 模块不直接支持仅读取几行,作为一种解决方法,您可以将文件作为文本文件读取,根据需要获取尽可能多的行并将其保存到某个临时位置。保存行后,您可以使用 spark-csv 读取行,包括inferSchema选项(如果您处于探索模式,您可能希望使用该选项)。

val numberOfLines = ...
spark.
  read.
  text("myfile.csv").
  limit(numberOfLines).
  write.
  text(s"myfile-$numberOfLines.csv")
val justFewLines = spark.
  read.
  option("inferSchema", true). // <-- you are in exploration mode, aren't you?
  csv(s"myfile-$numberOfLines.csv")
于 2017-05-31T06:26:59.287 回答
4

limit(n)在所有方面都没有推断模式和使用对我有用。

f_schema = StructType([
StructField("col1",LongType(),True),
StructField("col2",IntegerType(),True),
StructField("col3",DoubleType(),True)
...
])

df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true').schema(f_schema).load(data_path).limit(10)

注意:如果我们使用inferschema='true',它又是同一时间,因此可能是同样的旧东西。

但是,如果我们不了解模式,Jacek Laskowski 的解决方案也很有效。:)

于 2017-05-31T07:23:51.893 回答
3

从 PySpark 2.3 开始,您可以简单地将数据加载为文本、限制并在结果上应用 csv 阅读器:

(spark
  .read
  .options(inferSchema="true", header="true")
  .csv(
      spark.read.text("/path/to/file")
          .limit(20)                   # Apply limit
          .rdd.flatMap(lambda x: x)))  # Convert to RDD[str]

从 Spark 2.2 开始提供 Scala 对应版本:

spark
  .read
  .options(Map("inferSchema" -> "true", "header" -> "true"))
  .csv(spark.read.text("/path/to/file").limit(20).as[String])

在 Spark 3.0.0 或更高版本中,也可以应用限制和使用from_csv功能,但它需要一个模式,因此它可能不符合您的要求。

于 2019-06-22T21:06:30.357 回答
1

Jacek Laskowski 给出的解决方案效果很好。下面展示一个内存中的变体。

我最近遇到了这个问题。我正在使用 databricks 并且有一个巨大的 csv 目录(200 个文件,每个文件 200MB)

我原本有

val df = spark.read.format("csv")
.option("header", true)
.option("sep", ",")
.option("inferSchema", true)
.load("dbfs:/huge/csv/files/in/this/directory/")

display(df)

这花了很多时间(10 多分钟),但后来我把它改成下面,它立即运行(2 秒)

val lines = spark.read.text("dbfs:/huge/csv/files/in/this/directory/").as[String].take(1000)

val df = spark.read
.option("header", true)
.option("sep", ",")
.option("inferSchema", true)
.csv(spark.createDataset(lines))

display(df)

推断文本格式的模式很困难,对于 csv 和 json(但如果它是多行 json)格式,可以通过这种方式完成。

于 2021-02-17T11:53:17.290 回答
0

由于我没有在答案中看到该解决方案,因此纯 SQL 方法对我有用:

df = spark.sql("SELECT * FROM csv.`/path/to/file` LIMIT 10000")

如果没有标题,列将命名为 _c0、_c1 等。不需要架构。

于 2021-07-13T06:30:31.217 回答
0

可能这会对在 java 中工作的人有所帮助。应用限制无助于减少时间。您必须从文件中收集 n 行。

        DataFrameReader frameReader = spark
          .read()
          .format("csv")
          .option("inferSchema", "true");
    //set framereader options, delimiters etc

    List<String> dataset = spark.read().textFile(filePath).limit(MAX_FILE_READ_SIZE).collectAsList();
    return frameReader.csv(spark.createDataset(dataset, Encoders.STRING()));
于 2021-11-09T12:26:14.327 回答