0

我正在尝试将数据帧中的数据插入到增量表中。最初,我基于目标模式解析 xml 文件并将结果保存到数据框中。下面是用于解析的代码。

def parseAsset (nodeSeqXml: scala.xml.NodeSeq) : Seq[String] = {
  //convert nodeseq to xml
  
 
  
  Seq(  (nodeSeqXml \ "AMS").\@("Pro"),
        (nodeSeqXml \ "AMS").\@("Prod"),
        (nodeSeqXml \ "AMS").\@("Asset"),
        (nodeSeqXml \ "AMS").\@("Descrn"),
        (nodeSeqXml \ "AMS").\@("Creation_Dt"),
        (nodeSeqXml \ "AMS").\@("Provider"),
        (nodeSeqXml \ "AMS").\@("AssetD"),
        (nodeSeqXml \ "AMS").\@("lass"),
        (nodeSeqXml \ "AMS").\@("hyu"),
  ((nodeSeqXml \ "App_Data" ).map(d => ((d \\ "@Name").text + "@-" + (d \\ "@Value").text))).mkString("!-"))
}


val AssetXml = XML.loadFile("filepath/filename")
 
val metadataNodeSeqLst = (AssetXml \\ "Metadata")
var records: Seq[String] = Seq()
 //for each of Metadata tag
metadataNodeSeqLst.foreach(nodeSeqXml => {
  records = records :+ parseAsset(nodeSeqXml).mkString("%-")
})


val AssetDF = records.toDF("ETY_Asset")

在这一步之后,我将列拆分并分解数组列,最后将数据保存到数据框中,之后我将使用下面的方法将此数据插入到增量表中。

outputparse.write.format("delta").mode("append").option("mergeSchema", "true").insertInto("targettable")

如果源文件与目标文件的列数相同,这可以正常工作。但是在这种情况下,会有不同的文件具有不同的模式,它们将作为输入传递给解析代码。例如,目标模式有 77 列,如果传入文件有 65 列,并且在将数据插入增量表时,我会收到以下错误。

org.apache.spark.sql.AnalysisException: Cannot write to 'target', not enough data columns; target table has 74 column(s) but the inserted data has 65 column(s);

像这样我得到具有不同输入模式的文件,但我的目标模式是不变的。所以,基本上我需要将 Null 传递给缺失的字段。我知道在将数据写入数据帧之前,我需要在解析代码中进行模式比较。您能否告诉我如何实现这一点以及在我的解析代码中在何处合并此逻辑。

4

0 回答 0