2

我正在尝试使用 Spark Streaming 从 Kafka 主题中获取信息,然后解析我在该主题中获得的 json。为了在 DStream 中获取主题,我使用 stringReader,然后使用 foreach 从 DStream 中获取每个 RDD:

myRDD.collect().foreach(println)

为了将 myRDD 转换为 json (当我打印 myRDD 时,json 的格式是正确的)并提取我需要的两个字段,我尝试使用 json4s 并尝试了这个:

val jsonEvent = Json.parse(myRDD)
val srcIp = (jsonEvent / "src_ip")
val dstIp =  (jsonEvent / "dst_ip")

我也尝试过以这种方式使用 json4s:

val jsonEvent = parse(myRDD).asInstanceOf[JObject]
val srcIp = jsonEvent / "src_ip"

但它也不能正常工作。

这是输出:

java.lang.NoSuchMethodError: rg.json4s.jackson.JsonMethods$.parse$default$3()Z

这些是我正在使用的版本:

<dependency>
    <groupId>org.json4s</groupId>
    <artifactId>json4s-native_2.10</artifactId>
    <version>3.5.1</version>
</dependency>
<dependency>
    <groupId>org.json4s</groupId>
    <artifactId>json4s-jackson_2.10</artifactId>
    <version>3.5.1</version>
</dependency>

我认为问题在于我不明白如何将 RDD 中的每条记录转换为 json 对象来解析它。有人可以更深入地向我解释一下,以便我了解它是如何工作的吗?我的代码正确吗?

谢谢你。

4

1 回答 1

1

Spark 为您提供了将输入 JSON 读取到数据集/数据帧的 API。

如果您正在从文件中读取 JSON。您可以使用该SparkSession read().json()方法阅读

SparkSession spark = SparkSession.builder().appName("Test App").master("local[*]")
                .getOrCreate();

Dataset<Row> inputfileDataset = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
            .json("C:\path-to-file\inputfile.json"); 

如果您正在阅读 KAFKA Streams。List您可以通过将它们转换为of s来遍历每个 RDD String,然后将它们转换为 Dataset/Dataframe

JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class,
                String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);

JavaDStream<String> inputJsonStream = directKafkaStream.map(rdd -> {
    return rdd._2;
});

inputJsonStream.foreachRDD(inputRDD -> {
    SparkSession spark = JavaSparkSessionSingleton.getInstance(inputRDD.context().getConf());
    List<String> strings = inputRDD.collect();
    strings.forEach(x -> {
        Dataset<Row> inputDataset = spark.read().option("multiLine",true).option("mode", "PERMISSIVE").json(inputRDD);
        inputDataset.printSchema();
});

为了查询数据集/数据框,您可以使用Select()数据集上的函数,然后将其转换为您想要的数据类型。

于 2018-04-17T18:05:02.307 回答