11

这是在 Spark Streaming 上运行简单 SQL 查询的代码。

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration

object StreamingSQL {

  case class Persons(name: String, age: Int)

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
    val sc = new SparkContext(sparkConf)
    // Create the context
    val ssc = new StreamingContext(sc, Seconds(2))

    val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people/")
    lines.foreachRDD(rdd=>rdd.foreach(println))

    val sqc = new SQLContext(sc);
    import sqc.createSchemaRDD

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created

    lines.foreachRDD(rdd=>{
      rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data")
      val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
      teenagers.foreach(println)
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

如您所见,要在流上运行 SQL,必须在 foreachRDD 方法中进行查询。我想对从两个不同流接收的数据运行 SQL 连接。有什么办法可以做到吗?

4

2 回答 2

8

好吧,我想总结一下我们在Spiro的回答中讨论后得出的解决方法。他建议首先创建一个空表,然后将 RDD 插入其中。唯一的问题是Spark 还不允许插入到表中。这是可以做的:

首先,创建一个与您期望的流具有相同架构的 RDD:

import sqlContext.createSchemaRDD
val d1=sc.parallelize(Array(("a",10),("b",3))).map(e=>Rec(e._1,e._2))

然后将其保存为Parquet 文件

d1.saveAsParquetFile("/home/p1.parquet")

现在,加载 parquet 文件并使用registerAsTable()方法将其注册为表。

val parquetFile = sqlContext.parquetFile("/home/p1.parquet")
parquetFile.registerAsTable("data")

现在,当您收到流时,只需在流上应用foreachRDD()并继续使用insertInto()方法在上面创建的表中插入各个 RDD

dStream.foreachRDD(rdd=>{
rdd.insertInto("data")
})

这个 insertInto() 工作正常,并允许将数据收集到表中。现在您可以对任意数量的流执行相同的操作,然后运行您的查询。

于 2014-09-03T06:36:30.770 回答
5

按照您编写代码的方式,每次运行 SQL 查询时,最终都会生成一系列小 SchemaRDD。诀窍是将这些中的每一个保存到累积 RDD 或累积表中。

首先,表格方法,使用insertInto

对于每个流,首先创建一个注册为表的 emty RDD,获得一个空表。对于您的示例,假设您将其称为“allTeenagers”。

然后,对于您的每个查询,使用 SchemaRDD 的insertInto方法将结果添加到该表:

teenagers.insertInto("allTeenagers")

如果您对两个流都执行此操作,创建两个单独的累积表,然后您可以使用普通的旧 SQL 查询将它们连接起来。

(注意:我实际上并没有让他工作,稍微搜索一下让我怀疑其他人有,但我很确定我已经理解 的设计意图insertInto,所以我认为这个解决方案是值得的记录。)

其次unionAll方法(还有一种union方法,但这使得获得正确的类型变得更加棘手):

这涉及到创建一个初始的 RDD——我们再一次称它为allTeenagers

// create initial SchemaRDD even if it's empty, so the types work out right
var allTeenagers = sqc.sql("SELECT ...")

然后,每次:

val teenagers = sqc.sql("SELECT ...")
allTeenagers = allTeenagers.unionAll(teenagers)

也许不用说您需要列匹配。

于 2014-08-25T22:53:14.660 回答