2

换句话说,我不想将 Spark 流上下文中的“持续时间”设置为一个值,而是将其设置为(套接字关闭时间 - 套接字打开时间)

4

2 回答 2

4

您可以使用StreamingListner接口来侦听正在断开连接的接收器,然后关闭流上下文。

这用作

// define listener
class MyListener extends StreamingListener {
  override def onReceiverStopped(...) {
    streamingContext.stop()
  }
} 

// attach listener
streamingContext. addStreamingListener(new MyListener())
于 2014-06-24T23:49:03.383 回答
0

通过在流式侦听器中执行 ssc.stop(),当您的流式传输出错或终止时,此函数将触发停止。还有另一个触发器,比如批处理开始、完成等。

    ssc.addStreamingListener(new StreamingListener() {

    @Override
      public void onReceiverError(StreamingListenerReceiverError receiverError) {
      System.out.println("Do what u want");
      ssc.stop();
      }

    @Override
      public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {
      System.out.println("Do what u want");
      ssc.stop(true,true); 
      }
    }

    ssc.start();
于 2015-11-16T08:38:56.050 回答