0

我有读取水槽的 Flume Avro 水槽和 SparkStreaming 程序。CDH 5.1、Flume 1.5.0、Spark 1.0,使用 Scala 作为 Spark 上的程序语言

我能够制作 Spark 示例并计算 Flume Avro 事件。

但是我无法将 Flume Avro 事件序列化为字符串\文本,然后解析结构行。

有没有人有一个如何使用 Scala 的例子?

4

3 回答 3

1

您可以使用以下代码反序列化水槽事件:

val eventBody = stream.map(e => new String(e.event.getBody.array))

下面是一个 spark 流应用程序示例,用于分析来自 twitter 的流行主题标签,使用 flume twitter 源和 avro sink 将事件推送到 spark:

import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._

object PopularHashTags {

val conf = new SparkConf().setMaster("local[4]").setAppName("PopularHashTags").set("spark.executor.memory", "1g")
val sc = new SparkContext(conf)

def main(args: Array[String]) {

sc.setLogLevel("WARN")

System.setProperty("twitter4j.oauth.consumerKey", <consumerKey>)
System.setProperty("twitter4j.oauth.consumerSecret", <consumerSecret>)
System.setProperty("twitter4j.oauth.accessToken", <accessToken>)
System.setProperty("twitter4j.oauth.accessTokenSecret", <accessTokenSecret>)

val ssc = new StreamingContext(sc, Seconds(5))
val filter = args.takeRight(args.length)
val stream = FlumeUtils.createStream(ssc, <hostname>, <port>)

val tweets = stream.map(e => new String(e.event.getBody.array))

val hashTags = tweets.flatMap(status => status.split(" ").filter(_.startsWith("#")))

val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
  .map { case (topic, count) => (count, topic) }
  .transform(_.sortByKey(false))

// Print popular hashtags
topCounts60.foreachRDD(rdd => {
  val topList = rdd.take(10)
  println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
  topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
})

stream.count().map(cnt => "Received " + cnt + " flume events.").print()

ssc.start()
ssc.awaitTermination()
    }

}
于 2016-08-28T13:59:49.890 回答
0

试试下面的代码:

stream.map(e => "Event:header:" + e.event.get(0).toString
                + "body: " + new String(e.event.getBody.array)).print
于 2014-10-31T03:56:06.593 回答
0

您可以实现自定义解码器以进行反序列化。提供预期的类型信息。

于 2014-10-11T03:21:06.497 回答