我有读取水槽的 Flume Avro 水槽和 SparkStreaming 程序。CDH 5.1、Flume 1.5.0、Spark 1.0,使用 Scala 作为 Spark 上的程序语言
我能够制作 Spark 示例并计算 Flume Avro 事件。
但是我无法将 Flume Avro 事件序列化为字符串\文本,然后解析结构行。
有没有人有一个如何使用 Scala 的例子?
我有读取水槽的 Flume Avro 水槽和 SparkStreaming 程序。CDH 5.1、Flume 1.5.0、Spark 1.0,使用 Scala 作为 Spark 上的程序语言
我能够制作 Spark 示例并计算 Flume Avro 事件。
但是我无法将 Flume Avro 事件序列化为字符串\文本,然后解析结构行。
有没有人有一个如何使用 Scala 的例子?
您可以使用以下代码反序列化水槽事件:
val eventBody = stream.map(e => new String(e.event.getBody.array))
下面是一个 spark 流应用程序示例,用于分析来自 twitter 的流行主题标签,使用 flume twitter 源和 avro sink 将事件推送到 spark:
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._
object PopularHashTags {
val conf = new SparkConf().setMaster("local[4]").setAppName("PopularHashTags").set("spark.executor.memory", "1g")
val sc = new SparkContext(conf)
def main(args: Array[String]) {
sc.setLogLevel("WARN")
System.setProperty("twitter4j.oauth.consumerKey", <consumerKey>)
System.setProperty("twitter4j.oauth.consumerSecret", <consumerSecret>)
System.setProperty("twitter4j.oauth.accessToken", <accessToken>)
System.setProperty("twitter4j.oauth.accessTokenSecret", <accessTokenSecret>)
val ssc = new StreamingContext(sc, Seconds(5))
val filter = args.takeRight(args.length)
val stream = FlumeUtils.createStream(ssc, <hostname>, <port>)
val tweets = stream.map(e => new String(e.event.getBody.array))
val hashTags = tweets.flatMap(status => status.split(" ").filter(_.startsWith("#")))
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
.map { case (topic, count) => (count, topic) }
.transform(_.sortByKey(false))
// Print popular hashtags
topCounts60.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
})
stream.count().map(cnt => "Received " + cnt + " flume events.").print()
ssc.start()
ssc.awaitTermination()
}
}
试试下面的代码:
stream.map(e => "Event:header:" + e.event.get(0).toString
+ "body: " + new String(e.event.getBody.array)).print
您可以实现自定义解码器以进行反序列化。提供预期的类型信息。