1

似乎当我应用in并将其作为 csv 文件存储在某个CONCAT位置时,在输出文件中单独向该列添加了额外的双引号。dataframespark sqldataframeHDFSconcat

当我应用显示时不添加此双引号。仅当我将其存储dataframe为 csv 文件时才添加此双引号

似乎我需要删除在保存dataframe为 csv 文件时添加的额外双引号。

我正在使用com.databricks:spark-csv_2.10:1.1.0罐子

Spark 版本为 1.5.0-cdh5.5.1

输入 :

 campaign_file_name_1, campaign_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89,    1
 campaign_file_name_1, campaign_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk,    2

预期输出:

 campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89,     campaign_name_1"="1,  2017-06-06 17:09:31
 campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk,   campaign_name_1"="2,  2017-06-06 17:09:31

火花代码:

  object campaignResultsMergerETL extends BaseETL {

  val now  = ApplicationUtil.getCurrentTimeStamp()
  val conf = new Configuration()
  val fs  = FileSystem.get(conf)
  val log = LoggerFactory.getLogger(this.getClass.getName)

  def main(args: Array[String]): Unit = {
    //---------------------
    code for sqlContext Initialization 
    //---------------------
    val campaignResultsDF  = sqlContext.read.format("com.databricks.spark.avro").load(campaignResultsLoc)
    campaignResultsDF.registerTempTable("campaign_results")
    val campaignGroupedDF =  sqlContext.sql(
   """
    |SELECT campaign_file_name,
    |campaign_name,
    |tracker_id,
    |SUM(campaign_measure) AS campaign_measure
    |FROM campaign_results
    |GROUP BY campaign_file_name,campaign_name,tracker_id
  """.stripMargin)

    campaignGroupedDF.registerTempTable("campaign_results_full")

    val campaignMergedDF =  sqlContext.sql(
  s"""
    |SELECT campaign_file_name,
    |tracker_id,
    |CONCAT(campaign_name,'\"=\"' ,campaign_measure),
    |"$now" AS audit_timestamp
    |FROM campaign_results_full
  """.stripMargin)

   campaignMergedDF.show(20)
   saveAsCSVFiles(campaignMergedDF, campaignResultsExportLoc, numPartitions)

   }


    def saveAsCSVFiles(campaignMeasureDF:DataFrame,hdfs_output_loc:String,numPartitions:Int): Unit =
    {
       log.info("saveAsCSVFile method started")
       if (fs.exists(new Path(hdfs_output_loc))){
          fs.delete(new Path(hdfs_output_loc), true)
       }
     campaignMeasureDF.repartition(numPartitions).write.format("com.databricks.spark.csv").save(hdfs_output_loc)
       log.info("saveAsCSVFile method ended")
    }

 }

结果campaignMergedDF.show(20)是正确的并且工作正常。

 campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89,   campaign_name_1"="1,  2017-06-06 17:09:31
 campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk,   campaign_name_1"="2,  2017-06-06 17:09:31

结果saveAsCSVFiles: 这是不正确的。

 campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89,   "campaign_name_1""=""1",  2017-06-06 17:09:31
 campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk,   "campaign_name_1""=""2",  2017-06-06 17:09:31

有人可以在这个问题上帮助我吗?

4

1 回答 1

1

当您使用

write.format("com.databricks.spark.csv").save(hdfs_output_loc)

为了编写包含"到 csv 文件中的文本,您将面临问题,因为该"符号被spark-csv定义为默认报价

将默认引号替换为其他内容(例如NULL)应该允许您按原样"写入文件。"

write.format("com.databricks.spark.csv").option("quote", "\u0000").save(hdfs_output_loc)

解释:

您正在使用默认的 spark-csv:

  • 转义值是\
  • 报价值为"

火花 csv 文档

  • 引号:默认情况下引号字符是“,但可以设置为任何字符。引号内的分隔符被忽略
  • 转义:默认转义字符是\,但可以设置为任何字符。转义的引号字符被忽略

这个答案提出了以下建议:

使用反斜杠字符 () 关闭双引号字符 (") 的默认转义的方法 - 即为了避免完全转义所有字符,您必须在 .option() 之后添加一个带有正确参数的方法调用。 write() 方法调用。option() 方法调用的目标是更改 csv() 方法在发出内容时如何“查找”“引号”字符的实例。为此,您必须更改默认值“引号”的实际含义;即将寻找的字符从双引号字符 (") 更改为 Unicode "\u0000" 字符(本质上是提供 Unicode NUL 字符,假设它永远不会出现在文档中)。

于 2017-06-07T14:15:41.943 回答