1

我正在使用以下代码来存储 to 的Spark-Streaming输出ElasticSearch。我想将 spark-streaming 的输出映射到正确的 name i.e (Key, OsName, PlatFormName, Mobile, BrowserName, Count)。但正如你所看到的,它目前被映射到 ES 中,如 _1 或 _2 等。此外,我想(if PlatFormName = "ubuntu" then index the data)在 ES 中索引数据之前放置一些过滤器,即。那么,我该怎么做呢?

 val realTimeAgg = lines.map{ x => ((x.key, x.os, x.platform, x.mobile, x.browser), 1)}.reduceByKey(_+_)

            val pageCounts = realTimeAgg.map    
            pageCounts.foreachRDD{ x => 
                    if (x.toLocalIterator.nonEmpty) {       
                        EsSpark.saveToEs(x, "spark/ElasticSearch")
                    }
                }   

            ssc.start()
            ssc.awaitTermination()

ElasticSearch 中的输出:

{
            "_index": "spark",
            "_type": "ElasticSearch",
            "_id": "AVTH0JPgzgtrAOUg77qq",
            "_score": 1,
            "_source": {
               "_1": {
                  "_3": "Amiga",
                  "_2": "AmigaOS 1.3",
                  "_6": "SeaMonkey",
                  "_1": "Usedcar",
                  "_4": 0,
                  "_5": 0
               },
               "_2": 1013
            }
         }
4

1 回答 1

1

弹性搜索文档的键是 _1、_2 等,因为您正在存储具有 (Tuple6, Long) 数据类型的 PairRDD。

要保留键,您应该使用案例类作为键。

val realTimeAgg = lines.map{ x => (x, 1)}.reduceByKey(_+_)

我假设对象 x 的类是一个案例类,并且您想使用该类的所有字段来进行归约(即检查 2 个案例类实例的相等性)。如果该类的所有字段都不是用于相等的类的自然键,那么您有两个选择 -

  1. 覆盖案例类的 equals 和 hashCode
  2. 创建另一个仅具有关键字段的案例类(您在元组中使用的字段 - (x.key, x.os, x.platform, x.mobile, x.browser))并映射到该案例类而不是第一行lines.map { x => ...}中的元组。

您可以在写入 ElasticSearch 之前添加所需的过滤器。

pageCounts.foreachRDD { x => 
                        if (x.toLocalIterator.nonEmpty) {
                            val y = x.filter(z => z._1.platform == "ubuntu")       
                            EsSpark.saveToEs(y, "spark/ElasticSearch")
                    }
                }  

PS:如果您正在使用 (case class, Long) case class 作为键来测试 RDD 对,就像我建议的 lines.map(x => (x, 1)).reduceByKey(_ + _) 一样。有一个与 Spark Shell 相关的错误,即案例类不能正确地作为减少操作的关键类 - jira 问题

于 2016-05-21T10:42:05.493 回答