我正在尝试使用 Spark Streaming 从 Kafka 主题中获取信息,然后解析我在该主题中获得的 json。为了在 DStream 中获取主题,我使用 stringReader,然后使用 foreach 从 DStream 中获取每个 RDD:
myRDD.collect().foreach(println)
为了将 myRDD 转换为 json (当我打印 myRDD 时,json 的格式是正确的)并提取我需要的两个字段,我尝试使用 json4s 并尝试了这个:
val jsonEvent = Json.parse(myRDD)
val srcIp = (jsonEvent / "src_ip")
val dstIp = (jsonEvent / "dst_ip")
我也尝试过以这种方式使用 json4s:
val jsonEvent = parse(myRDD).asInstanceOf[JObject]
val srcIp = jsonEvent / "src_ip"
但它也不能正常工作。
这是输出:
java.lang.NoSuchMethodError: rg.json4s.jackson.JsonMethods$.parse$default$3()Z
这些是我正在使用的版本:
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-native_2.10</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_2.10</artifactId>
<version>3.5.1</version>
</dependency>
我认为问题在于我不明白如何将 RDD 中的每条记录转换为 json 对象来解析它。有人可以更深入地向我解释一下,以便我了解它是如何工作的吗?我的代码正确吗?
谢谢你。