我们正在构建一个应用程序,该应用程序通过 Flume 从源系统获取数据,然后在 Kafka 消息系统的帮助下触发流式处理以进行内存处理,在将数据处理成数据帧后,我们会将数据放入 Hive 表中。流程将如下 Source System -> Flume -> Kafka -> Spark Streaming -> Hive ,是正确的流程还是我们需要审查它?
我们正在采用离散流并将其转换为数据帧以实现 SQL 兼容性功能。现在我们在 hive 中有 14 个表,我们必须根据代码类型加载数据。如果我们看到下面的代码,我们将过滤我们的 Dstream,然后再将其提供给特定的 foreachRDD,对于 14 个过滤器,我们必须为单独的 14 个 foreachRRD 主体过滤 14 次
val fil_sms = lines.filter(_.startsWith("1|"))
val fil_calls = lines.filter(_.startsWith("7|"))
我们可以在单个 foreachRDD 正文中处理这个问题吗,因为我已经尝试过,但它只过滤了一行。如果有人在此代码中提供帮助以使其在性能和实施方面更好,我是否在做正确的程序。希望你能理解我的查询
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.fs.FileSystem
import java.net.URI
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration
import org.apache.spark.sql.types._
object test1 {
case class cdrsms(col1: String , col2: String , col3: String , col4: String ,
col5: String , col6: String , col7: String , col8: String,
col9: String , col10: String)
case class cdrcalls(col1: String , col2: String , col3: String , col4: String ,
col5: String , col6: String , col7: String , col8: String,
col9: String , col10: String , col11: String ,col12 : String , col13 : String)
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("hwtest").setMaster("local[*]")
val topicsSet = "hwkf01".split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> "192.168.2.210:9092")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(20))
val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)
val fil_sms = lines.filter(_.startsWith("1|"))
val fil_calls = lines.filter(_.startsWith("7|"))
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
fil_sms.foreachRDD(rdd=> if(!rdd.isEmpty){
//val sms = rdd.filter(_.startsWith("1|"))
rdd.map(_.split('|')).map(p => cdrsms(p(0), p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8),p(9))).toDF().registerTempTable("cdr_sms")
val tbl1 = sqlContext.sql("SELECT * FROM cdr_sms")
tbl1.foreach(println)
sqlContext.sql("insert into table sms select * from cdr_data")
})
fil_calls.foreachRDD(rdd=> if(!rdd.isEmpty){
rdd.map(_.split('|')).map(p => cdrcalls(p(0), p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8),p(9),p(10),p(11),p(12))).toDF().registerTempTable("cdr_calls")
val tbl1 = sqlContext.sql("SELECT * FROM cdr_calls")
tbl1.foreach(println)
sqlContext.sql("insert into table calls select * from cdr_data")
})
ssc.start()
ssc.awaitTermination()
}
}