我正在使用以下代码来存储 to 的Spark-Streaming
输出ElasticSearch
。我想将 spark-streaming 的输出映射到正确的 name i.e (Key, OsName, PlatFormName, Mobile, BrowserName, Count)
。但正如你所看到的,它目前被映射到 ES 中,如 _1 或 _2 等。此外,我想(if PlatFormName = "ubuntu" then index the data)
在 ES 中索引数据之前放置一些过滤器,即。那么,我该怎么做呢?
val realTimeAgg = lines.map{ x => ((x.key, x.os, x.platform, x.mobile, x.browser), 1)}.reduceByKey(_+_)
val pageCounts = realTimeAgg.map
pageCounts.foreachRDD{ x =>
if (x.toLocalIterator.nonEmpty) {
EsSpark.saveToEs(x, "spark/ElasticSearch")
}
}
ssc.start()
ssc.awaitTermination()
ElasticSearch 中的输出:
{
"_index": "spark",
"_type": "ElasticSearch",
"_id": "AVTH0JPgzgtrAOUg77qq",
"_score": 1,
"_source": {
"_1": {
"_3": "Amiga",
"_2": "AmigaOS 1.3",
"_6": "SeaMonkey",
"_1": "Usedcar",
"_4": 0,
"_5": 0
},
"_2": 1013
}
}