1

I'm trying to parse a very simple XML string column using spark-xml, but I only manage to receive null values, even when the XML is correctly populated.

The XSD that I'm using to parse the xml is:

<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="note">
        <xs:complexType>
            <xs:sequence>
                <xs:element type="xs:string" name="from"/>
                <xs:element type="xs:string" name="to"/>
                <xs:element type="xs:string" name="message"/>
            </xs:sequence>
        </xs:complexType>
    </xs:element>
</xs:schema>

While the XML in the column present as string is as below, with every tag correctly populated:

<?xml version="1.0" encoding="UTF-8"?>
<note>
    <from>Jani</from>
    <to>Tove</to>
    <message>Remember me this weekend</message>
</note>

My spark code written in scala is something like this:

    // XML Schema
    val schema = XSDToSchema.read("<the XSD as string>")
    
    // Spark Structured Streaming (N.b. the column value contains the xml as string)
    import spark.implicits._
    var df = initSource(spark)
      .withColumn("parsed", from_xml($"value", schema, 
          Map(
              "mode" -> "FAILFAST",
              "nullValue"-> "",
              "rowTag" -> "note",
              "ignoreSurroundingSpaces" -> "true"
          )
      ))
      .select($"value",$"parsed.note.from", $"parsed.note.to", $"parsed.note.message")
      .writeStream
      .format("console")
      // .option("mode", "FAILFAST")
      // .option("nullValue", "")
      // .option("rowTag", "note")
      // .option("ignoreSurroundingSpaces","true")
      .outputMode("append")
      .start()
      .awaitTermination(30*1000)

Printing the schema of this dataFrame (before the select statement) would give the expected schema

root
 |-- value: string (nullable = true)
 |-- parsed: struct (nullable = true)
 |    |-- note: struct (nullable = false)
 |    |    |-- from: string (nullable = false)
 |    |    |-- to: string (nullable = false)
 |    |    |-- message: string (nullable = false)

But when printing in console the result, all I get are null values like below:

....
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+----+----+-------+
|               value|from|  to|message|
+--------------------+----+----+-------+
|<?xml version="1....|null|null|   null|
|<?xml version="1....|null|null|   null|
|<?xml version="1....|null|null|   null|
....

I don't think it is relevant but the source of this xml column is from reading a Kafka topic defined as below:

    def initSource(spark: SparkSession) : DataFrame = {
        spark.readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "localhost:9092")
          .option("startingoffsets", "earliest")
          .option("subscribe", "my-test-topic")
          .load()
          .selectExpr("CAST(value AS STRING)")
    }

Has anyone else experienced this problem and solved it? I'm running out of options, I would really appreciate a hint on this :)

The version of spark-xml I'm using is the latest one atm, 0.12.0 with spark 3.1.1.

Update

I was passing the spark-xml options wrongly after calling writeStream, instead they need to be passed as a 3rd parameter of the from_xml function. I still get only null values tho...

4

1 回答 1

1

At the end what opened my eyes was reading the part of the spark-xml documentation that mentions:

Path to an XSD file that is used to validate the XML for each row individually

This mean that the schema matching is done through each row and not through the entire XML, in that case the schema for my example needs to be something like the following:

val schema = StructType(Array(
        StructField("from", StringType, nullable = true),
        StructField("to", StringType, nullable = true),
        StructField("message", StringType, nullable = true)))

and it can also be done using the XSD:

<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element type="xs:string" name="from"/>
    <xs:element type="xs:string" name="to"/>
    <xs:element type="xs:string" name="message"/>
</xs:schema>

These two approaches for declaring the schema did the trick for me. Hope it helps to someone in the future.

于 2021-05-18T13:34:09.697 回答