1

我是使用 Spark 和 Kafka 集成在 Scala 中工作的新手。但是,我遇到了记录问题。我尝试了许多不同的日志库,但它们都从 Spark 返回相同的错误。

错误如下:Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

我的代码如下:

val df = spark.read.format("kafka")
  .option("kafka.bootstrap.servers", "host1:9092, host2:9092")
  .option("subscribe", "test")
  .load()

// Dataframe of the 'value' column in the original dataframe from above
val msg = df.select("value").as[String]

// modify_msg is a string produced by Extract_info
val modify_msg = Extract_Info(msg.first.getString(0)).toString()

//Error occurs here. I also tried different logger libraries like SLF4J
println(modify_msg)


val query = df.writeStream
  .outputMode("append")
  .format("console")
  .start()
query.awaitTermination()

我想知道是否有办法打印或记录结果。问题是该writeStream.start()函数仅适用于数据帧,我无法让它打印字符串。任何帮助将非常感激。

4

0 回答 0