我正在使用火花流从 AMQ 读取。我希望在消息队列中没有数据时停止流式传输。我创建了一个自定义接收器,它连接到 AMQ 主题并开始读取数据,但是工作人员如何告诉驱动程序没有剩余数据,以便它可以停止流式传输。
class CustomReceiver(brokerURL, topic, ...){
def onStart() {
new Thread("AMQ Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {}
private def receive() {
activeMQStream = new ActiveMQStream(broker, topic, ...)
val topicSubscriber = activeMQStream.getTopicSubscriber()
while(!isStopped && !ActiveMQReceiver.stop){
val message = topicSubscriber.receive(timeOutInMilliseconds)
if (message != null && message.isInstanceOf[TextMessage]) {
val textMessage = message.asInstanceOf[TextMessage];
val text = textMessage.getText();
store(text)
println("ActiveMQReceiver: there is data from AMQ ....")
} else {
ActiveMQReceiver.stop = true
println("ActiveMQReceiver: No more data from AMQ .....")
}
}
def checkStatus(): Boolean ={
ActiveMQReceiver.stop
}
}
object ActiveMQReceiver{
@volatile var stop: Boolean = false
}
正如您在上面看到的,当没有数据要读取时,我试图将停止标志设置为 true,但是当我运行以下命令时,该标志始终为 False,在搜索后我发现工作人员不共享变量。我试图用累加器替换它,但这也不起作用。
var ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val customReceiver = new CustomReceiver(brokerURL, topic, ...)
val stream: DStream[String] = ssc.receiverStream(customReceiver)
var driverList = List[String]()
stream.foreachRDD { rdd =>
if(rdd.count() > 0){
val fromWorker = rdd.collect().toList
driverList = driverList:::fromWorker
}
}
var stopFlag = false
var isStopped = false
val checkIntervalMillis = 10000
while (!isStopped) {
isStopped = ssc.awaitTerminationOrTimeout(checkIntervalMillis)
println("Check if stop flag was raised")
stopFlag = customReceiver.checkStatus()
if (!isStopped && stopFlag) {
var seq = driverList.toSeq
import spark.implicits._
val df = seq.toDS()
println("Request to stop")
ssc.stop(false, true)
}
}