4

我想了解 Spark Structured Streaming 的单元测试方面。我的场景是,我从 Kafka 获取数据,并使用 Spark Structured Streaming 使用它并在数据之上应用一些转换。

我不确定如何使用 Scala 和 Spark 进行测试。有人可以告诉我如何使用 Scala 在结构化流中进行单元测试。我是流媒体新手。

4

2 回答 2

12

tl;dr用于MemoryStream为输出添加事件和内存接收器。

以下代码应该有助于开始:

import org.apache.spark.sql.execution.streaming.MemoryStream
implicit val sqlCtx = spark.sqlContext
import spark.implicits._
val events = MemoryStream[Event]
val sessions = events.toDS
assert(sessions.isStreaming, "sessions must be a streaming Dataset")

// use sessions event stream to apply required transformations
val transformedSessions = ...

val streamingQuery = transformedSessions
  .writeStream
  .format("memory")
  .queryName(queryName)
  .option("checkpointLocation", checkpointLocation)
  .outputMode(queryOutputMode)
  .start

// Add events to MemoryStream as if they came from Kafka
val batch = Seq(
  eventGen.generate(userId = 1, offset = 1.second),
  eventGen.generate(userId = 2, offset = 2.seconds))
val currentOffset = events.addData(batch)
streamingQuery.processAllAvailable()
events.commit(currentOffset.asInstanceOf[LongOffset])

// check the output
// The output is in queryName table
// The following code simply shows the result
spark
  .table(queryName)
  .show(truncate = false)
于 2019-07-05T22:25:01.077 回答
1

因此,我尝试实现来自@Jacek 的答案,但我找不到如何创建eventGen对象以及测试一个小型流应用程序以在控制台上写入数据。我也在使用MemoryStream,在这里我展示了一个工作的小例子。

我测试的课程是:

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession, functions}

object StreamingDataFrames {

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName(StreamingDataFrames.getClass.getSimpleName)
      .master("local[2]")
      .getOrCreate()

    val lines = readData(spark, "socket")
    val streamingQuery = writeData(lines)
    streamingQuery.awaitTermination()
  }

  def readData(spark: SparkSession, source: String = "socket"): DataFrame = {
    val lines: DataFrame = spark.readStream
      .format(source)
      .option("host", "localhost")
      .option("port", 12345)
      .load()
    lines
  }

  def writeData(df: DataFrame, sink: String = "console", queryName: String = "calleventaggs", outputMode: String = "append"): StreamingQuery = {

    println(s"Is this a streaming data frame: ${df.isStreaming}")

    val shortLines: DataFrame = df.filter(functions.length(col("value")) >= 3)

    val query = shortLines.writeStream
      .format(sink)
      .queryName(queryName)
      .outputMode(outputMode)
      .start()
    query
  }
}

我只测试writeData方法。这是我将查询拆分为 2 个方法的方式。然后这里是测试类的规范。我使用一个SharedSparkSession类来促进 spark 上下文的打开和关闭。就像这里显示的一样。

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.streaming.{LongOffset, MemoryStream}
import org.github.explore.spark.SharedSparkSession
import org.scalatest.funsuite.AnyFunSuite

class StreamingDataFramesSpec extends AnyFunSuite with SharedSparkSession {

  test("spark structured streaming can read from memory socket") {

    // We can import sql implicits
    implicit val sqlCtx = sparkSession.sqlContext

    import sqlImplicits._

    val events = MemoryStream[String]
    val queryName: String = "calleventaggs"

    // Add events to MemoryStream as if they came from Kafka
    val batch = Seq(
      "this is a value to read",
      "and this is another value"
    )
    val currentOffset = events.addData(batch)

    val streamingQuery = StreamingDataFrames.writeData(events.toDF(), "memory", queryName)

    streamingQuery.processAllAvailable()
    events.commit(currentOffset.asInstanceOf[LongOffset])

    val result: DataFrame = sparkSession.table(queryName)
    result.show

    streamingQuery.awaitTermination(1000L)
    assertResult(batch.size)(result.count)

    val values = result.take(2)
    assertResult(batch(0))(values(0).getString(0))
    assertResult(batch(1))(values(1).getString(0))
  }
}
于 2021-03-05T16:34:05.363 回答