换句话说,我不想将 Spark 流上下文中的“持续时间”设置为一个值,而是将其设置为(套接字关闭时间 - 套接字打开时间)
问问题
2810 次
2 回答
4
您可以使用StreamingListner接口来侦听正在断开连接的接收器,然后关闭流上下文。
这用作
// define listener
class MyListener extends StreamingListener {
override def onReceiverStopped(...) {
streamingContext.stop()
}
}
// attach listener
streamingContext. addStreamingListener(new MyListener())
于 2014-06-24T23:49:03.383 回答
0
通过在流式侦听器中执行 ssc.stop(),当您的流式传输出错或终止时,此函数将触发停止。还有另一个触发器,比如批处理开始、完成等。
ssc.addStreamingListener(new StreamingListener() {
@Override
public void onReceiverError(StreamingListenerReceiverError receiverError) {
System.out.println("Do what u want");
ssc.stop();
}
@Override
public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {
System.out.println("Do what u want");
ssc.stop(true,true);
}
}
ssc.start();
于 2015-11-16T08:38:56.050 回答