我正在尝试使用 Spark Streaming 读取 Kafka 消息,进行一些计算并将结果发送到另一个进程。
val jsonObject = new JSONObject
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
stream.foreachRDD { rdd => {
val jsonDF = spark.read.json(rdd.map(_._2))
val res = jsonDF.groupBy("artist").count.sort(col("count").desc).take(10)
/*Some Transformations => create jsonArray*/
jsonObject.put("Key", jsonArray)
}}
ssc.start()
我需要为我的要求积累 JSONObject(全局变量)。put
操作引发 NotSerializable 异常。
java.io.NotSerializableException:org.apache.spark.streaming.kafka.DirectKafkaInputDStream$MappedDStream 的对象可能作为 RDD 操作闭包的一部分被序列化。这是因为 DStream 对象是从闭包中引用的。请重写此 DStream 中的 RDD 操作以避免这种情况。这是为了避免 Spark 任务因不必要的对象而膨胀。
是否可以将此 jsonArray 发送出此 forahRDD 块?我不想写入文件或数据库。