1

我有一个流应用程序正在运行到 Databricks 笔记本作业 ( https://docs.databricks.com/jobs.html )。我希望能够使用该stop()方法StreamingQuery返回的类的方法优雅地停止流式传输作业stream.start()。这当然需要访问所提到的流实例或访问正在运行的作业本身的上下文。在第二种情况下,代码可能如下所示:

spark.sqlContext.streams.get("some_streaming_uuid").stop()

上面的代码应该从不同的笔记本作业中执行,stop_streaming_job尽管我无法找到访问作业上下文和执行上述 scala 代码的方法,但我们可以调用它。有什么方法可以通过数据块笔记本实现这一目标吗?

4

1 回答 1

1

解决此问题的一种方法是使用 databricks 文件系统 (dbfs) 或本地文件系统。这个想法是StreamingQuery通过实现一个名为awaitExternalTermination. 该解决方案在给定的 DBFS 目录中创建一个新文件,该文件充当负责流作业生命周期的标志。只要文件存在于给定目录中,作业就会继续运行。接下来是文件观察器的实现,它是StreamingQuery该类的扩展方法并使用 Scala 期货:

object extensions {
  import fs._
  object FileSystemType extends Enumeration {
    val DBFS, LocalFileSystem = Value
  }

  implicit class FileSystemStopStreamingQuery(val self :StreamingQuery) extends AnyVal {
    /**
     * Extension method for StreamingQuery, it waits for an external call to delete the streaming file. When that happens it will call the stop method
     * of the current StreamingQuery instance.
     *
     * @param streamStopDir dir to be watched
     * @param jobName the job unique identifier/the file name
     * @param fsType DFFS or LocalFileSystem
     */
    def awaitExternalTermination(streamStopDir :String, jobName :String, fsType : FileSystemType.Value): Unit ={

      if(streamStopDir == null || streamStopDir.isEmpty)
        throw new IllegalArgumentException("streamStopDir can't be null or empty.")

      if(jobName == null || jobName.isEmpty)
        throw new IllegalArgumentException("jobName can't be null or empty.")

      val fsWrapper :FileSystemWrapper = fsType match {
        case FileSystemType.DBFS => new DbfsWrapper(streamStopDir, jobName)
        case FileSystemType.LocalFileSystem => new LocalFileSystemWrapper(streamStopDir, jobName)
        case _ => throw new IllegalArgumentException("Invalid file system provided.")
      }

      val stopWatchFuture: Future[Boolean] = Future {

        if(!fsWrapper.targetFileExists)
            fsWrapper.createTargetFile(self.id.toString)

        while (self.isActive && fsWrapper.targetFileExists){
          val random: ThreadLocalRandom = ThreadLocalRandom.current()
          val r = random.nextLong(10, 100 + 1) // returns value between 10 and 100
          Thread.sleep(r)
        }

        if(!fsWrapper.targetFileExists){
          self.stop()
          true
        }
        else
          false
      }

      var output = "success"
      stopWatchFuture onComplete {
        case Success(result : Boolean) => if (!result) {
          output = s"failure: file not found."
        }
        case Failure(t) => output = s"failure: ${t.getMessage}."
      }

      self.awaitTermination()
    }
  }
}

以及 DBFS 包装类的实现:

import com.databricks.dbutils_v1.DBUtilsHolder.dbutils

class DbfsWrapper(val stopDir: String, val targetFile: String) extends FileSystemWrapper {
  override def targetFileExists(): Boolean = {
    try {
      dbutils.fs.ls(targetPath).size > 0
    }
    catch {
      case _: java.io.FileNotFoundException => false
    }
  }

  override def createTargetFile(content: String): Unit = {
    dbutils.fs.put(targetPath, content)
  }
}

要停止流式传输作业,只需%fs rm -r your_path在使用 DBFS 或仅rm -r your_path用于本地 FS 时删除提到的文件。

完整的代码可以在这里找到。

于 2020-01-15T22:29:28.920 回答