我正在尝试创建一个使用 ProtoBuf 编码的 Kafka 消息的 Spark Streaming。
这是我最近几天尝试的
import spark.implicits._
def parseLine (str: Array[Byte]): ProtoSchema = ProtoSchema.parseFrom(str)
val storageLoc: String = "/tmp/avl/output"
val checkpointLoc: String = "/tmp/avl/checkpoint"
val dfStreamReader: DataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("failOnDataLoss", value = false)
.option("subscribe", topics)
.load()
val dfStreamReaderValues: Dataset[Array[Byte]] = dfStreamReader.map(row => row.getAs[Array[Byte]]("value"))
val rddProtoSchema: Dataset[ProtoSchema] = dfStreamReaderValues.map(str => parseLine(str))
val dfRaw: DataFrame = spark.sqlContext.protoToDataFrame(rddProtoSchema.rdd)
val streamWriterAirline: StreamingQuery = dfRaw.writeStream
.format("parquet")
.option("path", storageLoc)
.option("checkpointLocation", checkpointLoc)
.outputMode(Append)
.trigger(ProcessingTime("2 seconds"))
.start()
spark.streams.awaitAnyTermination(20000)
使用 scalapb,我设法解码二进制 proto 文件并转换为数据帧。但是对于流式传输,我在解析行的编译时得到了这个异常:
val rddProtoSchema: Dataset[ProtoSchema] = dfStreamReaderValues.map(str => parseLine(str))
>>>>>
scala.ScalaReflectionException: <none> is not a term
谁能给点提示?