-1
val rdd = df.rdd.map(
      line => Row(
        "BNK",
        format.format(Calendar.getInstance().getTime()),
        line(0),
        scala.xml.XML.loadString("<?xml version='1.0' encoding='utf-8'?>" + line(1)).child.map(_.text).filter(_.nonEmpty)


      )
    )

产生输出

 values = {Object[4]@9906} 
 0 = "BNK"
 1 = "18-3-2019"
 2 = "185687194277431.060001"
 3 = {$colon$colon@9910} "::" size = 20
  0 = "KH0010001"
  1 = "-1171035537.00"
  2 = "9"
  3 = "65232"
  4 = "1"
  5 = "KHR"
  6 = "TR"
  7 = "6-54-10-1-005-004"
  8 = "1"
  9 = "1"
  10 = "DC183050001002108"
  11 = "DC"
  12 = "20181101"
  13 = "185687194277431.06"
  14 = "1"
  15 = "1"
  16 = "5022_DMUSER__OFS_DM.OFS.SRC.VAL"
  17 = "1811012130"
  18 = "6012_DMUSER"
  19 = "PL.65232.......1.....KH0010001"

如何将values[3]子列表展平20 items到主列表中。

所以预期的输出:

 values = 
 0 = "BNK"
 1 = "18-3-2019"
 2 = "185687194277431.060001"
 3 = "KH0010001"
 4 = "-1171035537.00"
 5 = "9"
 6 = "65232"
 7 = "1"
 ..
4

1 回答 1

2

更新问题后再次尝试。我认为模式需要手动生成,因为值是基于列表的。假设列表的大小始终为 20:

val schema = StructType((0 to 22)
  .map(x => StructField(x.toString, IntegerType))
  .toList)
spark.createDataFrame(df.rdd.map(line => Row.fromSeq("BNK" :: format.format(Calendar.getInstance().getTime()) :: line(0) :: scala.xml.XML.loadString("<?xml version='1.0' encoding='utf-8'?>" + line(1)).child.map(_.text).filter(_.nonEmpty).toList)), schema)

如果列表的大小不总是 20,则需要对其进行封顶/填充。希望它有所帮助。

于 2019-03-18T10:30:04.757 回答