0

我们正在使用自定义 spark 接收器,它从提供的 http 链接读取流数据。如果提供的 http 链接不正确,则接收器失败。问题是spark会不断重启receiver,应用永远不会终止。问题是如果接收器失败,如何告诉 Spark 终止应用程序。

这是我们自定义接收器的摘录:

 def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Receiver") {
      override def run() { receive() }
    }.start()
  }

  private def receive(): Unit = {
    ....
    val response: CloseableHttpResponse = httpclient.execute(req)
    try {
      val sl = response.getStatusLine()
      if (sl.getStatusCode != 200){
        val errorMsg = "Error: " + sl.getStatusCode 
        val thrw = new RuntimeException(errorMsg)
        stop(errorMsg, thrw)
      } else {
      ...
        store(doc)
      }

我们有一个使用此接收器的 spark 流应用程序:

val ssc = new StreamingContext(sparkConf, duration)
val changes = ssc.receiverStream(new CustomReceiver(...
...
ssc.start()
ssc.awaitTermination()

如果接收器没有错误,一切都会按预期工作。如果接收器失败(例如使用错误的 http 链接),spark 将不断地重新启动它,并且应用程序将永远不会终止。

16/05/31 17:03:38 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
16/05/31 17:03:38 ERROR ReceiverTracker: Receiver has been stopped. Try to restart it.

如果接收器失败,我们只想终止整个应用程序。

4

2 回答 2

2

有一种方法可以控制基于自定义接收器的火花流应用程序的生命周期。为您的应用程序定义工作进度监听器并跟踪正在发生的事情。

class CustomReceiverListener extends StreamingJobProgressListener {
    private boolean receiverStopped = false;

    public CustomReceiverListener(StreamingContext ssc) { super(ssc);}

    public boolean isReceiverStopped() {
        return receiverStopped;
    }
    @Override
    public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {
        LOG.info("Update the flag field");
        this.receiverStopped = true;
    }
}

并在您的驱动程序中,初始化一个线程来监视receiverStopped标志的状态。当这个线程完成时,驱动程序将停止流应用程序。(更好的方法是定义一个由驱动程序定义的回调方法,它将停止流应用程序)。

CustomReceiverListener listener = new CustomReceiverListener(ssc);
ssc.addStreamingListener(listener);
ssc.start();
Thread thread = new Thread(() -> {
    while (!listener.isReceiverStopped()) {
        LOG.info("Sleepy head...");
        try {
            Thread.sleep(2 * 1000); /*check after 2 seconds*/
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
thread.start();
thread.join();
LOG.info("Listener asked to die! Going to commit suicide :(");
ssc.stop(true, false);

注意:如果您的接收器有多个实例,请更改实现CustomReceiverListener以确保所有接收器实例都已停止。

于 2016-12-14T12:29:22.150 回答
0

似乎 Spark Streaming 中的调度以这样一种方式工作,即 ReceiverTracker 将继续重新启动失败的接收器,直到 ReceiverTracker 本身没有停止。

https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L618

要停止 ReceiverTracker,我们需要停止整个应用程序。因此,似乎没有办法从接收器本身控制这个过程。

于 2016-06-02T01:22:50.913 回答