我正在使用 spark scala 中的 scalapb 解析通过 protobuf 序列化的 bahir mqtt 有效负载,但解析的 json 仅包含第一个 json 对象,其他对象为空。
- 火花版本:2.3.0
- 斯卡拉版本:2.11.8
- Protobuf 版本:2
- sparksql-scalapb 版本:0.8.0
import spark.implicits._
val parsedData = lines.select("payload").as[Array[Byte]].map(ParseData.parseFrom(_))
原型文件
syntax = "proto2";
option java_package = "protobuf";
message ParseData {
required int64 timestamp = 1;
message METRICS {
required string name = 1;
optional int64 timestamp = 2;
optional string dataType = 3;
optional double value = 4;
}
repeated METRICS metrics = 2;
required int32 seq = 3;
}
我得到的结果
+-------------+--------------------+---+
|timestamp |metrics |seq|
+-------------+--------------------+---+
|1567158851979|[[T05,,,], [T06,,,]]|54 |
+-------------+--------------------+---+
但预期的结果是
+-------------+-----------------------------------------------------------------+---+
|timestamp |metrics
+-------------+-----------------------------------------------------------------+---+
|1567158851979|[[T05,1566920552229,Float,34.56], [T06,1566920552229,Float,32.5]]|54 |
+-------------+-----------------------------------------------------------------+---+
更新 1
序列化之前的有效载荷传入消息如下所示:
{
"metrics" : [{
"name" : "T05",
"timestamp" : 1566920552229,
"dataType" : "Float",
"value" : 34.56
},
{
"name" : "T06",
"timestamp" : 1566920552229,
"dataType" : "Float",
"value" : 32.5
}]
}
MQTT 服务器正在使用 eclipse tahu 项目,该项目使用 protobuf 序列化有效负载。
更新 2
这是代码:
val lines = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", topic)
.option("username", username)
.option("password", password)
.load(brokerUrl)
import spark.implicits._
val parseLines = lines.select("payload").as[Array[Byte]].map(ParseData.parseFrom(_))
lines.printSchema()
val data = parseLines.writeStream
.outputMode("append")
.format("console")
.option("truncate", false)
.start()
data.awaitTermination()
流数据的模式和样本
root
|-- id: integer (nullable = true)
|-- topic: string (nullable = true)
|-- payload: binary (nullable = true)
|-- timestamp: timestamp (nullable = true)
+---+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|id |topic |payload |timestamp |
+---+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|0 |spBv1.0/XYZ/DDATA/Tahu/ABC|[08 83 E8 99 99 CE 2D 12 53 0A 3F 46 69 6E 69 73 68 69 6E 67 2F 54 72 69 6F 2F 66 61 63 65 53 43 36 30 30 2F 4D 45 53 50 72 6F 63 65 73 73 54 61 67 73 2F 68 6F 6C 65 43 6F 72 72 65 63 74 69 6F 6E 4C 6F 63 61 74 69 6F 6E 31 18 D2 AE 99 99 CE 2D 20 09 38 00 4A 00 65 9A D9 19 43 12 57 0A 43 46 69 6E 69 73 68 69 6E 67 2F 54 72 69 6F 2F 66 61 63 65 53 43 36 30 30 2F 4D 45 53 50 72 6F 63 65 73 73 54 61 67 73 2F 68 6F 6C 65 43 6F 72 72 65 63 74 69 6F 6E 4C 6F 63 61 74 69 6F 6E 44 65 6C 74 61 18 EA AF 99 99 CE 2D 20 09 38 00 4A 00 65 9A D9 19 43 12 49 0A 35 4D 69 78 50 72 65 70 61 72 61 74 69 6F 6E 2F 42 6F 6E 64 4D 69 78 69 6E 67 2F 73 69 67 6E 61 6C 73 4D 45 53 2F 6D 69 78 65 72 54 69 6D 65 45 6C 61 70 73 65 64 18 BA A9 99 99 CE 2D 20 03 38 00 4A 00 50 CD E2 82 11 12 4B 0A 37 4D 69 78 50 72 65 70 61 72 61 74 69 6F 6E 2F 42 6F 6E 64 4D 69 78 69 6E 67 2F 73 69 67 6E 61 6C 73 4D 45 53 2F 6D 69 78 65 72 54 69 6D 65 52 65 6D 61 69 6E 69 6E 67 18 EB A9 99 99 CE 2D 20 03 38 00 4A 00 50 F1 D6 82 11 12 4C 0A 38 46 69 6E 69 73 68 69 6E 67 2F 54 72 69 6F 2F 31 73 74 43 6F 6E 74 72 6F 6C 2F 53 69 67 6E 61 6C 73 2F 46 49 52 53 54 5F 43 4F 4E 54 52 4F 4C 5F 54 45 4D 50 5F 45 4E 47 18 EE D2 F5 9B CE 2D 20 09 38 00 4A 00 65 9D A2 02 42 12 47 0A 33 46 69 6E 69 73 68 69 6E 67 2F 54 72 69 6F 2F 66 61 63 65 53 43 36 30 30 2F 73 69 67 6E 61 6C 73 2F 4D 6F 74 6F 72 5F 43 75 72 72 65 6E 74 5F 41 6D 70 73 18 EE D2 F5 9B CE 2D 20 09 38 00 4A 00 65 22 B7 47 41 18 17]|2019-08-30 17:30:43|
|0 |spBv1.0/XYZ/DDATA/Tahu/ABC|[08 EB EF 99 99 CE 2D 12 49 0A 35 4D 69 78 50 72 65 70 61 72 61 74 69 6F 6E 2F 42 6F 6E 64 4D 69 78 69 6E 67 2F 73 69 67 6E 61 6C 73 4D 45 53 2F 6D 69 78 65 72 54 69 6D 65 45 6C 61 70 73 65 64 18 A3 B1 99 99 CE 2D 20 03 38 00 4A 00 50 CE E2 82 11 12 4B 0A 37 4D 69 78 50 72 65 70 61 72 61 74 69 6F 6E 2F 42 6F 6E 64 4D 69 78 69 6E 67 2F 73 69 67 6E 61 6C 73 4D 45 53 2F 6D 69 78 65 72 54 69 6D 65 52 65 6D 61 69 6E 69 6E 67 18 D3 B1 99 99 CE 2D 20 03 38 00 4A 00 50 F2 D6 82 11 18 18] |2019-08-30 17:30:44|
+---+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
感谢您的意见。