1

我正在尝试编写简单的测试用例来使用火花结构流。代码的灵感来自 github 上的holdenk

这是 CustomSink 代码

case class CustomSink(func: DataFrame => Unit)
  extends Sink {

  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    func(data)
  }
}

class CustomSinkProvider extends StreamSinkProvider {
  def func(df: DataFrame) {
    df.show(5)
  }

  def createSink(
                  sqlContext: SQLContext,
                  parameters: Map[String, String],
                  partitionColumns: Seq[String],
                  outputMode: OutputMode): CustomSink = {
    new CustomSink(func)
  }
}

我尝试使用 MemoryStream 在测试用例中运行它

@Test
def demoCustomSink: Unit = {
  val input = MemoryStream[String]
  val doubled = input.toDS().map(x => x + " " + x)

  // input.addData("init")

  val query = doubled.writeStream
    .queryName("testCustomSinkBasic")
    .format("com.knockdata.spark.highcharts.demo.CustomSinkProvider")
    .start()

  input.addData("hi")

  query.processAllAvailable()
}

无线报错input.addData("init")

2016-10-12 03:48:37 ERROR StreamExecution       :91 - Query testCustomSinkBasic terminated with error
java.lang.RuntimeException: No data selected!
  at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.streaming.MemoryStream$$anonfun$getBatch$4.apply(memory.scala:110)
at org.apache.spark.sql.execution.streaming.MemoryStream$$anonfun$getBatch$4.apply(memory.scala:110)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.MemoryStream.getBatch(memory.scala:109)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:332)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:329)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:329)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:194)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:184)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:120)

init如果添加线路,则不会到达接收器input.addData("init")

如果我取消注释行,测试用例可以成功运行而不会报告错误input.addData("init")

但是值init没有到达接收器。仅hi hi显示值。

为什么以及如何解决它?

4

1 回答 1

0

后台有检查点机制。如果检查点目录中有一些数据,就会出错。

使用以下代码创建和帮助方法来清除目录。

val checkpointPath = Files.createTempDirectory("query")
val checkpointDir = checkpointPath.toFile

checkpointDir.deleteOnExit()

def deleteRecursively(file: java.io.File): Unit = {
  if (file.isDirectory) {
    file.listFiles().foreach(deleteRecursively)
    file.delete()
  }
  else
    file.delete()
}

def clearCheckpointDir: Unit = {
  checkpointDir.listFiles().foreach(deleteRecursively)
}

lazy val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.sql.streaming.checkpointLocation",
    checkpointDir.getAbsolutePath)
  .master("local[*]")
  .appName("test")
  .getOrCreate()

然后在测试用例中,我添加了以下代码并且自定义接收器按预期工作。

@Before
def before: Unit = {
  clearCheckpointDir()
}
于 2016-10-12T20:18:23.543 回答