2

在这里,我将 json 数据从“test”主题发送到 kafka,将模式提供给 json,进行一些转换并在控制台上打印。这是代码: -

val kafkadata = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("zookeeper.connect", "localhost:2181")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .option("max.poll.records", 10)
    .option("failOnDataLoss", false)
    .load()



val schema1 = new StructType()
    .add("id_sales_order", StringType)                               
    .add("item_collection",                                         
    MapType(                                             
      StringType,
      new StructType()
        .add("id", LongType)
        .add("ip", StringType)
        .add("description", StringType)
        .add("temp", LongType)
        .add("c02_level", LongType)
        .add("geo",
          new StructType()
            .add("lat", DoubleType)
            .add("long", DoubleType)
        )
    )
  )



val df = kafkadata.selectExpr("cast (value as string) as 
           json")
           .select(from_json($"json",
schema=schema1).as("data"))
.select($"data.id_sales_order",explode($"data.item_collection"))




 val query = df.writeStream
    .outputMode("append")
    .queryName("table")
    .format("console")
    .start()
  query.awaitTermination()
  spark.stop()

我通过两种方式向kafka发送数据:-

1)单行json:-

 {"id_sales_order": "2", "item_collection": {"2": {"id": 10,"ip": "68.28.91.22","description": "Sensor attached to the container ceilings","temp":35,"c02_level": 1475,"geo": { "lat":38.00, "long":97.00}}}}

It is giving me output
+--------------+---+--------------------+
|id_sales_order|key|               value|
+--------------+---+--------------------+
|             2|  2|[10,68.28.91.22,S...|
+--------------+---+--------------------+

2)多行json:-

{
  "id_sales_order": "2",
  "item_collection": {
    "2": {
      "id": 10,
      "ip": "68.28.91.22",
      "description": "Sensor attached to the container ceilings",
      "temp":35,
      "c02_level": 1475,
      "geo":
        { "lat":38.00, "long":97.00}
    }
}
}

It is not giving me any output.
+--------------+---+-----+
|id_sales_order|key|value|
+--------------+---+-----+
+--------------+---+-----+

来自源的 Json 就像第二个。

从kafka读取流数据时如何处理json?我认为问题可能是 from_json 函数不理解多行 json。

4

0 回答 0