0

我正在使用火花流从 AMQ 读取。我希望在消息队列中没有数据时停止流式传输。我创建了一个自定义接收器,它连接到 AMQ 主题并开始读取数据,但是工作人员如何告诉驱动程序没有剩余数据,以便它可以停止流式传输。

class CustomReceiver(brokerURL, topic, ...){

    def onStart() {
      new Thread("AMQ Receiver") {
        override def run() { receive() }
      }.start()
    }

    def onStop() {}

    private def receive() {
      activeMQStream = new ActiveMQStream(broker, topic, ...)
      val topicSubscriber = activeMQStream.getTopicSubscriber()

      while(!isStopped && !ActiveMQReceiver.stop){
         val message = topicSubscriber.receive(timeOutInMilliseconds)
         if (message != null && message.isInstanceOf[TextMessage]) {
             val textMessage = message.asInstanceOf[TextMessage];
             val text = textMessage.getText();
             store(text)
             println("ActiveMQReceiver: there is data from AMQ ....")
         } else {
             ActiveMQReceiver.stop = true
             println("ActiveMQReceiver: No more data from AMQ .....")
         }
    }

    def checkStatus(): Boolean ={
        ActiveMQReceiver.stop
    }

}

object ActiveMQReceiver{
  @volatile var stop: Boolean = false
}

正如您在上面看到的,当没有数据要读取时,我试图将停止标志设置为 true,但是当我运行以下命令时,该标志始终为 False,在搜索后我发现工作人员不共享变量。我试图用累加器替换它,但这也不起作用。

var ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val customReceiver = new CustomReceiver(brokerURL, topic, ...)
val stream: DStream[String] = ssc.receiverStream(customReceiver)
var driverList = List[String]()
stream.foreachRDD { rdd =>
  if(rdd.count() > 0){
    val fromWorker = rdd.collect().toList
    driverList = driverList:::fromWorker
  }
} 

var stopFlag = false
var isStopped = false
val checkIntervalMillis = 10000
while (!isStopped) {
  isStopped = ssc.awaitTerminationOrTimeout(checkIntervalMillis)
  println("Check if stop flag was raised")
  stopFlag = customReceiver.checkStatus()

  if (!isStopped && stopFlag) {
    var seq = driverList.toSeq
    import spark.implicits._
    val df = seq.toDS()
    println("Request to stop")
    ssc.stop(false, true)
  }
}
4

1 回答 1

0

依靠receive() 返回null 来表示没有剩余数据在生产中是不可靠的。这种方法消除了任何自我修复和故障转移支持,并引入了一个计时/竞争条件,您可能会变得“不走运”。作为替代方案,请查看使用消息组并将流中最后一条消息的标头设置为使用明确定义的消息发出信号。

消息组

于 2021-09-16T13:58:59.907 回答