1

我有一个数据框,我将其连接到它的所有字段。

连接后它成为另一个数据帧,最后我将其输出写入 csv 文件,并在其两列上进行了分区。它的一列存在于第一个数据框中,我不想将其包含在最终输出中。

这是我的代码:

val dfMainOutput = df1resultFinal.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
      .select($"LineItem_organizationId", $"LineItem_lineItemId",
       when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition".cast(DataTypes.StringType)).as("DataPartition"),
       when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"),
       when($"FFAction_1".isNotNull, concat(col("FFAction_1"), lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction"))
       .filter(!$"FFAction".contains("D"))

在这里,我正在连接并创建另一个数据框:

val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.fieldNames.map(c => col(c)): _*).as("concatenated"))     

这是我尝试过的

dfMainOutputFinal
  .drop("DataPartition")
  .write
  .partitionBy("DataPartition","StatementTypeCode")
  .format("csv")
  .option("header","true")
  .option("encoding", "\ufeff")
  .option("codec", "gzip")
  .save("path to csv")

现在我不想在我的输出中出现 DataPartition 列。

我正在基于 DataPartition 进行分区,所以我没有得到,但因为 DataPartition 存在于主数据框中,所以我在输出中得到它。

问题 1: 如何忽略 Dataframe 中的列

问题2:在写入我的实际数据之前,有什么方法可以"\ufeff"在csv输出文件中添加,这样我的编码格式就会变成UTF-8-BOM。

根据建议的答案

这是我尝试过的

 val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.filter(_ != "DataPartition").fieldNames.map(c => col(c)): _*).as("concatenated"))

但低于错误

<console>:238: error: value fieldNames is not a member of Seq[org.apache.spark.sql.types.StructField]
               val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.filter(_ != "DataPartition").fieldNames.map(c => col(c)): _*).as("concatenated"))

下面是我是否必须在最终输出中删除两列的问题

  val dfMainOutputFinal = dfMainOutput.select($"DataPartition","PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition","PartitionYear").map(c => col(c)): _*).as("concatenated"))
4

2 回答 2

0

问题 1:如何忽略 Dataframe 中的列

答:

val df = sc.parallelize(List(Person(1,2,3), Person(4,5,6))).toDF("age", "height", "weight")

df.columns
df.show()



+---+------+------+
|age|height|weight|
+---+------+------+
|  1|     2|     3|
|  4|     5|     6|
+---+------+------+


val df_new=df.select("age", "height")
    df_new.columns
    df_new.show()

+---+------+
|age|height|
+---+------+
|  1|     2|
|  4|     5|
+---+------+

df: org.apache.spark.sql.DataFrame = [age: int, height: int ... 1 more field]
df_new: org.apache.spark.sql.DataFrame = [age: int, height: int]

QUESTION 2:在写入我的实际数据之前,有什么方法可以在 csv 输出文件中添加“\ufeff”,这样我的编码格式就会变成 UTF-8-BOM。

答:

 String path= "/data/vaquarkhan/input/unicode.csv";

 String outputPath = "file:/data/vaquarkhan/output/output.csv";
    getSparkSession()
      .read()
      .option("inferSchema", "true")
      .option("header", "true")
      .option("encoding", "UTF-8")
      .csv(path)
      .write()
      .mode(SaveMode.Overwrite)
      .csv(outputPath);
}
于 2017-10-07T13:54:48.673 回答
0

问题一:

您使用的列df.write.partitionBy()不会添加到最终的 csv 文件中。它们会被自动忽略,因为数据是在文件结构中编码的。但是,如果您的意思是将其从concat_ws(并因此从文件中)删除,则可以进行一些小的更改:

concat_ws("|^|", 
  dfMainOutput.schema.fieldNames
    .filter(_ != "DataPartition")
    .map(c => col(c)): _*).as("concatenated"))

在这里,DataPartition 列在连接之前被过滤掉。

问题2:

Spark 似乎不支持,并且在读取具有该格式的文件时UTF-8 BOM似乎会导致问题。除了编写脚本在 Spark 完成后添加它们之外,我想不出任何简单的方法将 BOM 字节添加到每个 csv 文件。我的建议是简单地使用普通UTF-8格式。

dfMainOutputFinal.write.partitionBy("DataPartition","StatementTypeCode")
  .format("csv")
  .option("header", "true")
  .option("encoding", "UTF-8")
  .option("codec", "gzip")
  .save("path to csv")

此外,根据Unicode 标准,不推荐使用 BOM。

... UTF-8 既不需要也不建议使用 BOM,但在 UTF-8 数据从使用 BOM 的其他编码形式转换或 BOM 用作 UTF-8 签名的情况下可能会遇到.

于 2017-10-09T02:15:10.287 回答