我是火花结构流和天蓝色事件中心的新手,并编写了如下代码:
val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()
import spark.implicits._
val body=incomingStream.select(incomingStream.col("body").cast("String"))
val query=body.writeStream.format("memory").outputMode("append").option("checkpointLocation",fileLocation+checkpointFolder).queryName("Body").start()
val data=spark.sqlContext.sql("select * from Body").map(x=>x(0).toString())
val result=spark.sqlContext.read.json(data)
result.write
.partitionBy(partitionColumn)
.option("path",fileLocation+eventHubName)
query.awaitTermination()
我无法获取 val 数据变量中的数据,也无法将其写入 azure 数据湖。我可以在日志中看到 writestream 正在内存中创建表,Body
但在输出中没有任何内容。有人能告诉我我做错了什么吗?