4
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.eventhubs.EventHubsUtils
import sqlContext.implicits._

val ehParams = Map[String, String](
    "eventhubs.policyname" -> "Full",
...
)

val ssc = new StreamingContext(sc, Seconds(2))
val stream = EventHubsUtils.createUnionStream(ssc, ehParams)
val cr = stream.window(Seconds(6))

case class Message(msg: String)
stream.map(msg=>Message(new String(msg))).foreachRDD(rdd=>rdd.toDF().registerTempTable("temp"))

stream.print
ssc.start

以上开始并运行良好,但我似乎无法阻止它。对 %sql show tables 的任何调用都将冻结。

我如何停止上面的 StreamingContext ?

4

2 回答 2

6

ssc.stop还会杀死 Spark 上下文,需要重新启动解释器。

改为使用ssc.stop(stopSparkContext=false, stopGracefully=true)

于 2016-07-14T00:38:14.917 回答
1

ssc.stop在一个新的段落中应该停止它

dev@ 邮件列表中还在讨论如何改进与流媒体平台的集成。

于 2015-09-07T02:58:14.497 回答