我的 ElasticSearch 集群中有一些与销售相关的 JSON 数据,我想使用 Spark Streaming(使用 Spark 1.4.1)通过 Kafka 动态聚合来自我的电子商务网站的传入销售事件,以获取当前用户总数的视图销售额(就收入和产品而言)。
从我阅读的文档中我不太清楚的是如何在 Spark 应用程序启动时从 ElasticSearch 加载历史数据,并计算例如每个用户的总收入(基于历史记录和来自卡夫卡)。
我有以下(工作)代码来连接到我的 Kafka 实例并接收 JSON 文档:
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
object ReadFromKafka {
def main(args: Array[String]) {
val checkpointDirectory = "/tmp"
val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[2]")
val topicsSet = Array("tracking").toSet
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
// Create direct kafka stream with brokers and topics
val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
//Iterate
messages.foreachRDD { rdd =>
//If data is present, continue
if (rdd.count() > 0) {
//Create SQLContect and parse JSON
val sqlContext = new SQLContext(sc)
val trackingEvents = sqlContext.read.json(rdd.values)
//Sample aggregation of incoming data
trackingEvents.groupBy("type").count().show()
}
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
我知道有一个 ElasticSearch 插件(https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html#spark-read),但我不清楚如何整合阅读启动时,流计算过程将历史数据与流数据聚合。
非常感谢帮助!提前致谢。