1

有一些使用 SQL over Spark Streaming 的示例foreachRDD()。但是,如果我想在中使用 SQL tranform()

case class AlertMsg(host:String, count:Int, sum:Double)
val lines = ssc.socketTextStream("localhost", 8888)
lines.transform( rdd => {
  if (rdd.count > 0) {
    val t = sqc.jsonRDD(rdd)
    t.registerTempTable("logstash")
    val sqlreport = sqc.sql("SELECT host, COUNT(host) AS host_c, AVG(lineno) AS line_a FROM logstash WHERE path = '/var/log/system.log' AND lineno > 70 GROUP BY host ORDER BY host_c DESC LIMIT 100")
    sqlreport.map(r => AlertMsg(r(0).toString,r(1).toString.toInt,r(2).toString.toDouble))
  } else {
    rdd
  }
}).print()

我收到这样的错误:

[错误] /Users/raochenlin/Downloads/spark-1.2.0-bin-hadoop2.4/logstash/src/main/scala/LogStash.scala:52:方法转换没有类型参数:(transformFunc:org.apache。 spark.rdd.RDD[String] => org.apache.spark.rdd.RDD[U])(隐含证据$5: scala.reflect.ClassTag[U])org.apache.spark.streaming.dstream.DStream[U ] 存在以便它可以应用于参数 (org.apache.spark.rdd.RDD[String] => org.apache.spark.rdd.RDD[_ >: LogStash.AlertMsg with String <: java.io.Serializable ]) [error] --- 因为 --- [error] 参数表达式的类型与形参类型不兼容;[错误] 发现:org.apache.spark.rdd.RDD[String] => org.apache.spark.rdd.RDD[_ >: LogStash.AlertMsg with String <: java.io.Serializable] [error] required: org.apache.spark.rdd.RDD[String] => org.apache.spark.rdd.

似乎只有我使用sqlreport.map(r => r.toString)才能正确使用?

4

1 回答 1

0

dstream.transform采取一个函数transformFunc: (RDD[T]) ⇒ RDD[U] 在这种情况下,if必须在条件的两个评估中产生相同的类型,但情况并非如此:

if (count == 0) => RDD[String]
if (count > 0) => RDD[AlertMsg]

在这种情况下,去掉优化,if rdd.count ...让你有一个唯一的转换路径。

于 2015-02-15T21:34:59.220 回答