I'm trying to parse a very simple XML string column using spark-xml, but I only manage to receive null
values, even when the XML is correctly populated.
The XSD that I'm using to parse the xml is:
<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="note">
<xs:complexType>
<xs:sequence>
<xs:element type="xs:string" name="from"/>
<xs:element type="xs:string" name="to"/>
<xs:element type="xs:string" name="message"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
While the XML in the column present as string is as below, with every tag correctly populated:
<?xml version="1.0" encoding="UTF-8"?>
<note>
<from>Jani</from>
<to>Tove</to>
<message>Remember me this weekend</message>
</note>
My spark code written in scala is something like this:
// XML Schema
val schema = XSDToSchema.read("<the XSD as string>")
// Spark Structured Streaming (N.b. the column value contains the xml as string)
import spark.implicits._
var df = initSource(spark)
.withColumn("parsed", from_xml($"value", schema,
Map(
"mode" -> "FAILFAST",
"nullValue"-> "",
"rowTag" -> "note",
"ignoreSurroundingSpaces" -> "true"
)
))
.select($"value",$"parsed.note.from", $"parsed.note.to", $"parsed.note.message")
.writeStream
.format("console")
// .option("mode", "FAILFAST")
// .option("nullValue", "")
// .option("rowTag", "note")
// .option("ignoreSurroundingSpaces","true")
.outputMode("append")
.start()
.awaitTermination(30*1000)
Printing the schema of this dataFrame (before the select statement) would give the expected schema
root
|-- value: string (nullable = true)
|-- parsed: struct (nullable = true)
| |-- note: struct (nullable = false)
| | |-- from: string (nullable = false)
| | |-- to: string (nullable = false)
| | |-- message: string (nullable = false)
But when printing in console the result, all I get are null
values like below:
....
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+----+----+-------+
| value|from| to|message|
+--------------------+----+----+-------+
|<?xml version="1....|null|null| null|
|<?xml version="1....|null|null| null|
|<?xml version="1....|null|null| null|
....
I don't think it is relevant but the source of this xml column is from reading a Kafka topic defined as below:
def initSource(spark: SparkSession) : DataFrame = {
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingoffsets", "earliest")
.option("subscribe", "my-test-topic")
.load()
.selectExpr("CAST(value AS STRING)")
}
Has anyone else experienced this problem and solved it? I'm running out of options, I would really appreciate a hint on this :)
The version of spark-xml I'm using is the latest one atm, 0.12.0
with spark 3.1.1
.
Update
I was passing the spark-xml options wrongly after calling writeStream
, instead they need to be passed as a 3rd parameter of the from_xml
function. I still get only null values tho...