0

我想知道如何使用 scala 函数将 JSON 字符串作为消息发送到 kafka 主题,并通过在 spark 结构化流中使用 readstream() 来消耗,另存为 parquet 格式。当前使用以下代码,但未创建镶木地板文件。请帮助获取带有数据的镶木地板文件。这也实现为函数,并且需要在集成测试中调用这两个函数。

发送到 Kafka 主题的 JSON 消息-

object kafkaProducer extends App {

    def sendMessages(): Unit = {

        //define topic
        val topic = "spark-topic"

        //define producer properties
        val props = new java.util.Properties()
        props.put("bootstrap.servers", "localhost:9092")
        props.put("client.id", "KafkaProducer")
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.put("value.serializer", "org.apache.kafka.connect.json.JsonSerializer")

        //create producer instance
        val kafkaProducer = new KafkaProducer[String, JsonNode](props)

        //create object mapper
        val mapper = new ObjectMapper with ScalaObjectMapper
        mapper.registerModule(DefaultScalaModule)
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

        //mapper Json object to string
        def toJson(value: Any): String = {
            mapper.writeValueAsString(value)
        }

        //send producer message
        val jsonstring =
        s"""{
            | "id": "0001",
            | "name": "Peter"
            |}
         """.stripMargin

        val jsonNode: JsonNode = mapper.readTree(jsonstring)
        val rec = new ProducerRecord[String, JsonNode](topic, jsonNode)
        kafkaProducer.send(rec)
        //println(rec)    
    }
}

Spark Structured Streaming - kafka 消费者

object sparkConsumer extends App {

    def receiveMessages(): Unit = {

    //create a SparkSession for consumer
    val spark = SparkSession
      .builder()
      .appName("SparkStructConsumer")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._
    spark.sparkContext.setLogLevel("WARN")

    //Subscribe the stream from Kafka
    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "spark-topic")
      .option("startingOffsets", "latest")
      .option("groupid","kafka-spark-group")
      .option("failOnDataLoss", false)
      .load


    //create schema for json string
    val data_Schema: StructType = new StructType()
      .add("id", StringType)
      .add("name", StringType)

    //Convert Stream according to schema along with TimeStamp
    val df1 = df.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
      .select(from_json($"value", data_Schema).as("data"), $"timestamp")
      .select("data.id", "data.name", "timestamp")
      .toDF("Id", "Name", "Timestamp")


    //writing as parquet
    val query = df1
      .writeStream
      .outputMode(OutputMode.Append)
      .queryName("table")
      .format("parquet")
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .option("header", "true")
      .option("checkpointLocation", "/Users/s/desktop/SparkOutput")
      .option("path", "/Users/s/desktop/SparkOutput")
      .start()

      query.awaitTermination()

    }
}

集成测试

class SparkKafkaIntegrationTests extends FlatSpecLike with Matchers with EmbeddedKafka {

    import org.apache.log4j.{Level, Logger}

    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)

    "Kafka producer sent JSON message" should "Consumed by the Spark Structured Streaming consumer" in {

        //define zookeeper and kafkaserver
        val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 9092, zooKeeperPort = 2181)

        val properties = new Properties()
        properties.setProperty("bootstrap.servers", "localhost:9092")
        properties.setProperty("zookeeper.connect", "localhost:2181")


        kafkaProducer.sendMessages()

        sparkConsumer.receiveMessages()

    }
}

谢谢你。沙米尔。

4

0 回答 0