0

我是火花结构流和天蓝色事件中心的新手,并编写了如下代码:

val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()
import spark.implicits._

val body=incomingStream.select(incomingStream.col("body").cast("String"))
val query=body.writeStream.format("memory").outputMode("append").option("checkpointLocation",fileLocation+checkpointFolder).queryName("Body").start()
val data=spark.sqlContext.sql("select * from Body").map(x=>x(0).toString())

val result=spark.sqlContext.read.json(data)
result.write
  .partitionBy(partitionColumn)
  .option("path",fileLocation+eventHubName)

query.awaitTermination()

我无法获取 val 数据变量中的数据,也无法将其写入 azure 数据湖。我可以在日志中看到 writestream 正在内存中创建表,Body但在输出中没有任何内容。有人能告诉我我做错了什么吗?

4

0 回答 0