3

我正在学习结构化流,但无法将输出显示到我的控制台。

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.ProcessingTime

object kafka_stream {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("kafka-consumer")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._

    spark.sparkContext.setLogLevel("WARN")

//    val schema = StructType().add("a", IntegerType()).add("b", StringType())

    val schema = StructType(Seq(
      StructField("a", IntegerType, true),
      StructField("b", StringType, true)
    ))

    val df = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "172.21.0.187:9093")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .load()


    val values = df.selectExpr("CAST(value AS STRING)").as[String]

    values.writeStream
      .outputMode("append")
      .format("console")
      .start()
      .awaitTermination()


  }


}

我对卡夫卡的意见

my name is abc how are you ?

我只想显示从 Kafka 到 spark 控制台的字符串

4

0 回答 0