2

我们正在构建一个应用程序,该应用程序通过 Flume 从源系统获取数据,然后在 Kafka 消息系统的帮助下触发流式处理以进行内存处理,在将数据处理成数据帧后,我们会将数据放入 Hive 表中。流程将如下 Source System -> Flume -> Kafka -> Spark Streaming -> Hive ,是正确的流程还是我们需要审查它?

我们正在采用离散流并将其转换为数据帧以实现 SQL 兼容性功能。现在我们在 hive 中有 14 个表,我们必须根据代码类型加载数据。如果我们看到下面的代码,我们将过滤我们的 Dstream,然后再将其提供给特定的 foreachRDD,对于 14 个过滤器,我们必须为单独的 14 个 foreachRRD 主体过滤 14 次

val fil_sms = lines.filter(_.startsWith("1|"))
    val fil_calls = lines.filter(_.startsWith("7|"))

我们可以在单个 foreachRDD 正文中处理这个问题吗,因为我已经尝试过,但它只过滤了一行。如果有人在此代码中提供帮助以使其在性能和实施方面更好,我是否在做正确的程序。希望你能理解我的查询

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.fs.FileSystem
import java.net.URI
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration
import org.apache.spark.sql.types._
    object test1 {

      case class cdrsms(col1: String , col2: String , col3: String , col4: String ,
                       col5: String , col6: String , col7: String , col8: String,
                       col9: String ,  col10: String)


      case class cdrcalls(col1: String , col2: String , col3: String , col4: String ,
                        col5: String , col6: String , col7: String , col8: String,
                        col9: String ,  col10: String , col11: String ,col12 : String , col13 : String)

      def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("hwtest").setMaster("local[*]")
        val topicsSet = "hwkf01".split(",").toSet
        val kafkaParams = Map[String, String]("metadata.broker.list" -> "192.168.2.210:9092")
        val sc = new SparkContext(sparkConf)
        val ssc = new StreamingContext(sc, Seconds(20))
        val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc, kafkaParams, topicsSet)
        val lines = messages.map(_._2)
        val fil_sms = lines.filter(_.startsWith("1|"))
        val fil_calls = lines.filter(_.startsWith("7|"))
       val sqlContext = new HiveContext(sc)
        import sqlContext.implicits._

        fil_sms.foreachRDD(rdd=> if(!rdd.isEmpty){
          //val sms = rdd.filter(_.startsWith("1|"))
          rdd.map(_.split('|')).map(p => cdrsms(p(0), p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8),p(9))).toDF().registerTempTable("cdr_sms")
          val tbl1 = sqlContext.sql("SELECT * FROM cdr_sms")
          tbl1.foreach(println)
          sqlContext.sql("insert into table sms select * from cdr_data")
        })

        fil_calls.foreachRDD(rdd=> if(!rdd.isEmpty){
          rdd.map(_.split('|')).map(p => cdrcalls(p(0), p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8),p(9),p(10),p(11),p(12))).toDF().registerTempTable("cdr_calls")
          val tbl1 = sqlContext.sql("SELECT * FROM cdr_calls")
          tbl1.foreach(println)

          sqlContext.sql("insert into table calls select * from cdr_data")
        })
        ssc.start()
        ssc.awaitTermination()
      }
    }
4

1 回答 1

0

我正在等待专家的回答意味着虽然我已经尝试了很多锻炼并且我在单个 foreachRDD 中将我的代码更改为以下,请指导是否是好方法。两个代码都运行良好并且具有相同的最终结果,但我正在寻找实现相同结果的最佳实践。我已经删除了临时表创建,现在直接插入到 hive 表中,这肯定会节省物理内存和处理时间。

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.fs.FileSystem
import java.net.URI
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration
import org.apache.spark.sql.types._

object test1 {

  case class cdrsms(col1: String , col2: String , col3: String , col4: String ,
                   col5: String , col6: String , col7: String , col8: String,
                   col9: String ,  col10: String)


  case class cdrcalls(col1: String , col2: String , col3: String , col4: String ,
                    col5: String , col6: String , col7: String , col8: String,
                    col9: String ,  col10: String , col11: String ,col12 : String , col13 : String)

  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("hwtest").setMaster("local[*]")
    val topicsSet = "hwkf01".split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "192.168.2.210:9092")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(20))
    val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)
    val lines = messages.map(_._2)

   val sqlContext = new HiveContext(sc)
    import sqlContext.implicits._

    lines.foreachRDD(rdd=> if(!rdd.isEmpty){
      val sms = rdd.filter(_.startsWith("7|"))
      val calls = rdd.filter(_.startsWith("1|"))

      sms.map(_.split('|'))
        .map(p => cdrsms(p(0), p(1),p(2), p(3),p(4),p(5),p(6),p(7),p(8),p(9)))
        .toDF()
        .write.mode("append")
        .insertInto("sms_cdr")

      calls.map(_.split('|'))
        .map(p => cdrcalls(p(0), p(1),p(2), p(3),p(4),p(5),p(6),p(7),p(8),p(9),p(10),p(11),p(12)))
        .toDF()
        .write.mode("append")
        .insertInto("calls_cdr")

   })


    ssc.start()
    ssc.awaitTermination()
  }
}
于 2018-04-04T09:44:42.200 回答