0

我正在尝试使用 Spark Streaming 读取 Kafka 消息,进行一些计算并将结果发送到另一个进程。

val jsonObject = new JSONObject

val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, topicsSet)

stream.foreachRDD { rdd => {
  val jsonDF = spark.read.json(rdd.map(_._2))
  val res = jsonDF.groupBy("artist").count.sort(col("count").desc).take(10)
  /*Some Transformations => create jsonArray*/
  jsonObject.put("Key", jsonArray)
}}

ssc.start()

我需要为我的要求积累 JSONObject(全局变量)。put操作引发 NotSerializable 异常。

java.io.NotSerializableException:org.apache.spark.streaming.kafka.DirectKafkaInputDStream$MappedDStream 的对象可能作为 RDD 操作闭包的一部分被序列化。这是因为 DStream 对象是从闭包中引用的。请重写此 DStream 中的 RDD 操作以避免这种情况。这是为了避免 Spark 任务因不必要的对象而膨胀。

是否可以将此 jsonArray 发送出此 forahRDD 块?我不想写入文件或数据库。

4

0 回答 0