0

以下是我的 csv 文件中的内容:

A1,B1,C1
A2,B2,C2,D1
A3,B3,C3,D2,E1
A4,B4,C4,D3
A5,B5,C5,,E2

因此,第一行有 5 列,但只有 3 个值。

我使用以下命令阅读它:

val csvDF : DataFrame = spark.read
.option("header", "false")
.option("delimiter", ",")
.option("inferSchema", "false")
.csv("file.csv") 

以下是我使用 csvDF.show() 得到的结果

+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
| A1| B1| C1|
| A2| B2| C2|
| A3| B3| C3|
| A4| B4| C4|
| A5| B5| C5|
+---+---+---+

如何读取所有列中的所有数据?

4

3 回答 3

1

基本上,您的 csv 文件格式不正确,因为它在每行中没有相同数量的列,如果您想使用spark.read.csv. 但是,您可以改为阅读它,spark.read.textFile然后解析每一行。

据我了解,您事先不知道列数,因此您希望您的代码处理任意数量的列。为此,您需要确定数据集中的最大列数,因此您需要对数据集进行两次遍历。

对于这个特定的问题,我实际上会使用 RDD 而不是 DataFrames 或 Datasets,如下所示:

val data  = spark.read.textFile("file.csv").rdd

val rdd = data.map(s => (s, s.split(",").length)).cache
val maxColumns = rdd.map(_._2).max()

val x = rdd
  .map(row => {
    val rowData = row._1.split(",")
    val extraColumns = Array.ofDim[String](maxColumns - rowData.length)
    Row((rowData ++ extraColumns).toList:_*)
  })

希望有帮助:)

于 2017-08-10T10:04:35.790 回答
0

您可以将其读取为只有一列的数据集(例如,使用另一个分隔符):

var df = spark.read.format("csv").option("delimiter",";").load("test.csv")
df.show()

+--------------+
|           _c0|
+--------------+
|      A1,B1,C1|
|   A2,B2,C2,D1|
|A3,B3,C3,D2,E1|
|   A4,B4,C4,D3|
|  A5,B5,C5,,E2|
+--------------+

然后您可以使用此答案将您的列手动拆分为五列,这将在元素不存在时添加空值:

var csvDF = df.withColumn("_tmp",split($"_c0",",")).select(
    $"_tmp".getItem(0).as("col1"),
    $"_tmp".getItem(1).as("col2"),
    $"_tmp".getItem(2).as("col3"),
    $"_tmp".getItem(3).as("col4"),
    $"_tmp".getItem(4).as("col5")
)
csvDF.show()

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|  A1|  B1|  C1|null|null|
|  A2|  B2|  C2|  D1|null|
|  A3|  B3|  C3|  D2|  E1|
|  A4|  B4|  C4|  D3|null|
|  A5|  B5|  C5|    |  E2|
+----+----+----+----+----+
于 2017-08-10T08:32:53.980 回答
0

如果列dataTypes和列数已知,那么您可以在读取文件时定义schema并应用. 下面我将所有五列定义为schemacsvdataframestringType

val schema = StructType(Seq(
  StructField("col1", StringType, true),
  StructField("col2", StringType, true),
  StructField("col3", StringType, true),
  StructField("col4", StringType, true),
  StructField("col5", StringType, true)))

val csvDF : DataFrame = sqlContext.read
  .option("header", "false")
  .option("delimiter", ",")
  .option("inferSchema", "false")
  .schema(schema)
  .csv("file.csv")

你应该dataframe得到

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|A1  |B1  |C1  |null|null|
|A2  |B2  |C2  |D1  |null|
|A3  |B3  |C3  |D2  |E1  |
|A4  |B4  |C4  |D3  |null|
|A5  |B5  |C5  |null|E2  |
+----+----+----+----+----+
于 2017-08-10T15:30:59.757 回答