3

我的 ElasticSearch 集群中有一些与销售相关的 JSON 数据,我想使用 Spark Streaming(使用 Spark 1.4.1)通过 Kafka 动态聚合来自我的电子商务网站的传入销售事件,以获取当前用户总数的视图销售额(就收入和产品而言)。

从我阅读的文档中我不太清楚的是如何在 Spark 应用程序启动时从 ElasticSearch 加载历史数据,并计算例如每个用户的总收入(基于历史记录和来自卡夫卡)。

我有以下(工作)代码来连接到我的 Kafka 实例并接收 JSON 文档:

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext

object ReadFromKafka {
  def main(args: Array[String]) {

    val checkpointDirectory = "/tmp"
    val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[2]")
    val topicsSet = Array("tracking").toSet

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))

    // Create direct kafka stream with brokers and topics
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    //Iterate
    messages.foreachRDD { rdd =>

      //If data is present, continue
      if (rdd.count() > 0) {

        //Create SQLContect and parse JSON
        val sqlContext = new SQLContext(sc)
        val trackingEvents = sqlContext.read.json(rdd.values)

        //Sample aggregation of incoming data
        trackingEvents.groupBy("type").count().show()

      }

    }

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

我知道有一个 ElasticSearch 插件(https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html#spark-read),但我不清楚如何整合阅读启动时,流计算过程将历史数据与流数据聚合。

非常感谢帮助!提前致谢。

4

1 回答 1

1

RDD 是不可变的,因此在创建它们之后,您无法向它们添加数据,例如使用新事件更新收入。

您可以做的是将现有数据与新事件合并以创建新的 RDD,然后您可以将其用作当前总数。例如...

var currentTotal: RDD[(Key, Value)] = ... //read from ElasticSearch
messages.foreachRDD { rdd =>
    currentTotal = currentTotal.union(rdd)
}

在这种情况下,我们创建currentTotal一个var,因为当它与传入的数据联合时,它将被对新 RDD 的引用替换。

在联合之后,您可能希望执行一些进一步的操作,例如减少属于同一 Key 的值,但您得到了图片。

如果您使用这种技术,请注意您的 RDD 的血统会增长,因为每个新创建的 RDD 都会引用其父级。这可能会导致堆栈溢出样式沿袭问题。要解决此问题,您可以checkpoint()定期调用 RDD。

于 2015-07-27T11:15:49.917 回答