我正在尝试编写简单的测试用例来使用火花结构流。代码的灵感来自 github 上的holdenk。
这是 CustomSink 代码
case class CustomSink(func: DataFrame => Unit)
extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
func(data)
}
}
class CustomSinkProvider extends StreamSinkProvider {
def func(df: DataFrame) {
df.show(5)
}
def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): CustomSink = {
new CustomSink(func)
}
}
我尝试使用 MemoryStream 在测试用例中运行它
@Test
def demoCustomSink: Unit = {
val input = MemoryStream[String]
val doubled = input.toDS().map(x => x + " " + x)
// input.addData("init")
val query = doubled.writeStream
.queryName("testCustomSinkBasic")
.format("com.knockdata.spark.highcharts.demo.CustomSinkProvider")
.start()
input.addData("hi")
query.processAllAvailable()
}
无线报错input.addData("init")
2016-10-12 03:48:37 ERROR StreamExecution :91 - Query testCustomSinkBasic terminated with error
java.lang.RuntimeException: No data selected!
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.streaming.MemoryStream$$anonfun$getBatch$4.apply(memory.scala:110)
at org.apache.spark.sql.execution.streaming.MemoryStream$$anonfun$getBatch$4.apply(memory.scala:110)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.MemoryStream.getBatch(memory.scala:109)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:332)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:329)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:329)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:194)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:184)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:120)
init
如果添加线路,则不会到达接收器input.addData("init")
如果我取消注释行,测试用例可以成功运行而不会报告错误input.addData("init")
。
但是值init
没有到达接收器。仅hi hi
显示值。
为什么以及如何解决它?