0

我正在使用 spark scala 中的 scalapb 解析通过 protobuf 序列化的 bahir mqtt 有效负载,但解析的 json 仅包含第一个 json 对象,其他对象为空。

  • 火花版本:2.3.0
  • 斯卡拉版本:2.11.8
  • Protobuf 版本:2
  • sparksql-scalapb 版本:0.8.0
import spark.implicits._
val parsedData = lines.select("payload").as[Array[Byte]].map(ParseData.parseFrom(_))

原型文件

syntax = "proto2";
option java_package = "protobuf";

message ParseData {
    required int64 timestamp = 1;
    message METRICS {
        required string name = 1;
        optional int64 timestamp = 2;
        optional string dataType = 3;
        optional double value = 4;
    }
    repeated METRICS metrics = 2;
    required int32 seq = 3;
}

我得到的结果

+-------------+--------------------+---+
|timestamp    |metrics             |seq|
+-------------+--------------------+---+
|1567158851979|[[T05,,,], [T06,,,]]|54 |
+-------------+--------------------+---+

但预期的结果是

+-------------+-----------------------------------------------------------------+---+
|timestamp    |metrics                                                                                                      
+-------------+-----------------------------------------------------------------+---+
|1567158851979|[[T05,1566920552229,Float,34.56], [T06,1566920552229,Float,32.5]]|54 |
+-------------+-----------------------------------------------------------------+---+

更新 1

序列化之前的有效载荷传入消息如下所示:

{
"metrics" : [{
"name" : "T05",
"timestamp" : 1566920552229,
"dataType" : "Float",
"value" : 34.56
},
{
"name" : "T06",
"timestamp" : 1566920552229,
"dataType" : "Float",
"value" : 32.5
}]
}

MQTT 服务器正在使用 eclipse tahu 项目,该项目使用 protobuf 序列化有效负载。

更新 2

这是代码:

val lines = spark.readStream
      .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
      .option("topic", topic)
      .option("username", username)
      .option("password", password)
      .load(brokerUrl)

import spark.implicits._
val parseLines = lines.select("payload").as[Array[Byte]].map(ParseData.parseFrom(_))

lines.printSchema()

val data = parseLines.writeStream
      .outputMode("append")
      .format("console")
      .option("truncate", false)
      .start()


 data.awaitTermination()

流数据的模式和样本


root
 |-- id: integer (nullable = true)
 |-- topic: string (nullable = true)
 |-- payload: binary (nullable = true)
 |-- timestamp: timestamp (nullable = true)


|id |topic                     |payload|timestamp          |

|0  |spBv1.0/XYZ/DDATA/Tahu/ABC||2019-08-30 17:30:43|
|0  |spBv1.0/XYZ/DDATA/Tahu/ABC|[08 EB EF 99 99 CE 2D 12 49 0A 35 4D 69 78 50 72 65 70 61 72 61 74 69 6F 6E 2F 42 6F 6E 64 4D 69 78 69 6E 67 2F 73 69 67 6E 61 6C 73 4D 45 53 2F 6D 69 78 65 72 54 69 6D 65 45 6C 61 70 73 65 64 18 A3 B1 99 99 CE 2D 20 03 38 00 4A 00 50 CE E2 82 11 12 4B 0A 37 4D 69 78 50 72 65 70 61 72 61 74 69 6F 6E 2F 42 6F 6E 64 4D 69 78 69 6E 67 2F 73 69 67 6E 61 6C 73 4D 45 53 2F 6D 69 78 65 72 54 69 6D 65 52 65 6D 61 69 6E 69 6E 67 18 D3 B1 99 99 CE 2D 20 03 38 00 4A 00 50 F2 D6 82 11 18 18]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |2019-08-30 17:30:44|


感谢您的意见。

4

1 回答 1

0

让我们尝试隔离问题。我们可以先将 Spark 排除在外。

如果我们获取您提供的有效负载并尝试解析它:

val b: Array[Byte] = Array(0x08, 0x83, 0xE8, 0x99, 0x99, 0xCE, 0x2D, 0x12, 0x53, 0x0A, 0x3F, 0x46, 0x69, 0x6E, 0x69, 0x73, 0x68, 0x69, 0x6E, 0x67, 0x2F, 0x54, 0x72, 0x69, 0x6F, 0x2F, 0x66, 0x61, 0x63, 0x65, 0x53, 0x43, 0x36, 0x30, 0x30, 0x2F, 0x4D, 0x45, 0x53, 0x50, 0x72, 0x6F, 0x63, 0x65, 0x73, 0x73, 0x54, 0x61, 0x67, 0x73, 0x2F, 0x68, 0x6F, 0x6C, 0x65, 0x43, 0x6F, 0x72, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6F, 0x6E, 0x4C, 0x6F, 0x63, 0x61, 0x74, 0x69, 0x6F, 0x6E, 0x31, 0x18, 0xD2, 0xAE, 0x99, 0x99, 0xCE, 0x2D, 0x20, 0x09, 0x38, 0x00, 0x4A, 0x00, 0x65, 0x9A, 0xD9, 0x19, 0x43, 0x12, 0x57, 0x0A, 0x43, 0x46, 0x69, 0x6E, 0x69, 0x73, 0x68, 0x69, 0x6E, 0x67, 0x2F, 0x54, 0x72, 0x69, 0x6F, 0x2F, 0x66, 0x61, 0x63, 0x65, 0x53, 0x43, 0x36, 0x30, 0x30, 0x2F, 0x4D, 0x45, 0x53, 0x50, 0x72, 0x6F, 0x63, 0x65, 0x73, 0x73, 0x54, 0x61, 0x67, 0x73, 0x2F, 0x68, 0x6F, 0x6C, 0x65, 0x43, 0x6F, 0x72, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6F, 0x6E, 0x4C, 0x6F, 0x63, 0x61, 0x74, 0x69, 0x6F, 0x6E, 0x44, 0x65, 0x6C, 0x74, 0x61, 0x18, 0xEA, 0xAF, 0x99, 0x99, 0xCE, 0x2D, 0x20, 0x09, 0x38, 0x00, 0x4A, 0x00, 0x65, 0x9A, 0xD9, 0x19, 0x43, 0x12, 0x49, 0x0A, 0x35, 0x4D, 0x69, 0x78, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x61, 0x74, 0x69, 0x6F, 0x6E, 0x2F, 0x42, 0x6F, 0x6E, 0x64, 0x4D, 0x69, 0x78, 0x69, 0x6E, 0x67, 0x2F, 0x73, 0x69, 0x67, 0x6E, 0x61, 0x6C, 0x73, 0x4D, 0x45, 0x53, 0x2F, 0x6D, 0x69, 0x78, 0x65, 0x72, 0x54, 0x69, 0x6D, 0x65, 0x45, 0x6C, 0x61, 0x70, 0x73, 0x65, 0x64, 0x18, 0xBA, 0xA9, 0x99, 0x99, 0xCE, 0x2D, 0x20, 0x03, 0x38, 0x00, 0x4A, 0x00, 0x50, 0xCD, 0xE2, 0x82, 0x11, 0x12, 0x4B, 0x0A, 0x37, 0x4D, 0x69, 0x78, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x61, 0x74, 0x69, 0x6F, 0x6E, 0x2F, 0x42, 0x6F, 0x6E, 0x64, 0x4D, 0x69, 0x78, 0x69, 0x6E, 0x67, 0x2F, 0x73, 0x69, 0x67, 0x6E, 0x61, 0x6C, 0x73, 0x4D, 0x45, 0x53, 0x2F, 0x6D, 0x69, 0x78, 0x65, 0x72, 0x54, 0x69, 0x6D, 0x65, 0x52, 0x65, 0x6D, 0x61, 0x69, 0x6E, 0x69, 0x6E, 0x67, 0x18, 0xEB, 0xA9, 0x99, 0x99, 0xCE, 0x2D, 0x20, 0x03, 0x38, 0x00, 0x4A, 0x00, 0x50, 0xF1, 0xD6, 0x82, 0x11, 0x12, 0x4C, 0x0A, 0x38, 0x46, 0x69, 0x6E, 0x69, 0x73, 0x68, 0x69, 0x6E, 0x67, 0x2F, 0x54, 0x72, 0x69, 0x6F, 0x2F, 0x31, 0x73, 0x74, 0x43, 0x6F, 0x6E, 0x74, 0x72, 0x6F, 0x6C, 0x2F, 0x53, 0x69, 0x67, 0x6E, 0x61, 0x6C, 0x73, 0x2F, 0x46, 0x49, 0x52, 0x53, 0x54, 0x5F, 0x43, 0x4F, 0x4E, 0x54, 0x52, 0x4F, 0x4C, 0x5F, 0x54, 0x45, 0x4D, 0x50, 0x5F, 0x45, 0x4E, 0x47, 0x18, 0xEE, 0xD2, 0xF5, 0x9B, 0xCE, 0x2D, 0x20, 0x09, 0x38, 0x00, 0x4A, 0x00, 0x65, 0x9D, 0xA2, 0x02, 0x42, 0x12, 0x47, 0x0A, 0x33, 0x46, 0x69, 0x6E, 0x69, 0x73, 0x68, 0x69, 0x6E, 0x67, 0x2F, 0x54, 0x72, 0x69, 0x6F, 0x2F, 0x66, 0x61, 0x63, 0x65, 0x53, 0x43, 0x36, 0x30, 0x30, 0x2F, 0x73, 0x69, 0x67, 0x6E, 0x61, 0x6C, 0x73, 0x2F, 0x4D, 0x6F, 0x74, 0x6F, 0x72, 0x5F, 0x43, 0x75, 0x72, 0x72, 0x65, 0x6E, 0x74, 0x5F, 0x41, 0x6D, 0x70, 0x73, 0x18, 0xEE, 0xD2, 0xF5, 0x9B, 0xCE, 0x2D, 0x20, 0x09, 0x38, 0x00, 0x4A, 0x00, 0x65, 0x22, 0xB7, 0x47, 0x41, 0x18, 0x17).map(_.toByte)

println(ParseData.parseFrom(b).toProtoString)

然后输出是:

timestamp: 1567179043843
metrics {
  name: "Finishing/Trio/faceSC600/MESProcessTags/holeCorrectionLocation1"
}
metrics {
  name: "Finishing/Trio/faceSC600/MESProcessTags/holeCorrectionLocationDelta"
}
metrics {
  name: "MixPreparation/BondMixing/signalsMES/mixerTimeElapsed"
}
metrics {
  name: "MixPreparation/BondMixing/signalsMES/mixerTimeRemaining"
}
metrics {
  name: "Finishing/Trio/1stControl/Signals/FIRST_CONTROL_TEMP_ENG"
}
metrics {
  name: "Finishing/Trio/faceSC600/signals/Motor_Current_Amps"
}
seq: 23

这表明只设置了name里面的字段metrics,但没有设置所有其他字段。

这告诉我们您传递给的数据parseFrom没有您期望的字段。

下一步是调查为什么这些消息的生产者在序列化之前没有在 protos 中设置这些字段。问题似乎出在生产者方面。

于 2019-09-02T18:17:20.870 回答