我们正在使用自定义 spark 接收器,它从提供的 http 链接读取流数据。如果提供的 http 链接不正确,则接收器失败。问题是spark会不断重启receiver,应用永远不会终止。问题是如果接收器失败,如何告诉 Spark 终止应用程序。
这是我们自定义接收器的摘录:
def onStart() {
// Start the thread that receives data over a connection
new Thread("Receiver") {
override def run() { receive() }
}.start()
}
private def receive(): Unit = {
....
val response: CloseableHttpResponse = httpclient.execute(req)
try {
val sl = response.getStatusLine()
if (sl.getStatusCode != 200){
val errorMsg = "Error: " + sl.getStatusCode
val thrw = new RuntimeException(errorMsg)
stop(errorMsg, thrw)
} else {
...
store(doc)
}
我们有一个使用此接收器的 spark 流应用程序:
val ssc = new StreamingContext(sparkConf, duration)
val changes = ssc.receiverStream(new CustomReceiver(...
...
ssc.start()
ssc.awaitTermination()
如果接收器没有错误,一切都会按预期工作。如果接收器失败(例如使用错误的 http 链接),spark 将不断地重新启动它,并且应用程序将永远不会终止。
16/05/31 17:03:38 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
16/05/31 17:03:38 ERROR ReceiverTracker: Receiver has been stopped. Try to restart it.
如果接收器失败,我们只想终止整个应用程序。