我正在围绕 Apache Spark Streaming 编写一些自包含的集成测试。我想测试我的代码是否可以在我的模拟测试数据中摄取各种边缘情况。当我使用常规 RDD(不是流式传输)执行此操作时。我可以使用我的内联数据并在其上调用“并行化”以将其转换为 spark RDD。但是,我找不到这样的方法来创建 destreams。理想情况下,我想偶尔调用一些“推送”函数,并让元组神奇地出现在我的 dstream 中。ATM 我正在使用 Apache Kafka 执行此操作:我创建了一个临时队列,然后写入它。但这似乎有点矫枉过正。我宁愿直接从我的测试数据创建 test-dstream,而不必使用 Kafka 作为中介。
问问题
3061 次
3 回答
5
出于测试目的,您可以从 RDD 队列中创建输入流。在队列中推送更多 RDD 将模拟在批处理间隔中处理了更多事件。
val sc = SparkContextHolder.sc
val ssc = new StreamingContext(sc, Seconds(1))
val inputData: mutable.Queue[RDD[Int]] = mutable.Queue()
val inputStream: InputDStream[Int] = ssc.queueStream(inputData)
inputData += sc.makeRDD(List(1, 2)) // Emulate the RDD created during the first batch interval
inputData += sc.makeRDD(List(3, 4)) // 2nd batch interval
// etc
val result = inputStream.map(x => x*x)
result.foreachRDD(rdd => assertSomething(rdd))
ssc.start() // Don't forget to start the streaming context
于 2016-06-07T19:04:16.823 回答
1
除了 Raphael 解决方案之外,我认为您还喜欢一次处理一批或所有可用的方法。您需要在队列流的可选方法参数上相应地设置 oneAtATime 标志,如下所示:
val slideDuration = Milliseconds(100)
val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[8]")
val sparkSession: SparkSession = SparkSession.builder.config(conf).getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val queueOfRDDs = mutable.Queue[RDD[String]]()
val streamingContext: StreamingContext = new StreamingContext(sparkContext, slideDuration)
val rddOneQueuesAtATimeDS: DStream[String] = streamingContext.queueStream(queueOfRDDs, oneAtATime = true)
val rddFloodOfQueuesDS: DStream[String] = streamingContext.queueStream(queueOfRDDs, oneAtATime = false)
rddOneQueuesAtATimeDS.print(120)
rddFloodOfQueuesDS.print(120)
streamingContext.start()
for (i <- (1 to 10)) {
queueOfRDDs += sparkContext.makeRDD(simplePurchase(i))
queueOfRDDs += sparkContext.makeRDD(simplePurchase((i + 3) * (i + 3)))
Thread.sleep(slideDuration.milliseconds)
}
Thread.sleep(1000L)
于 2017-04-06T16:48:01.917 回答
0
这里的关键是调用“store”命令。用你想要的任何东西替换 store 的内容。
于 2015-10-22T13:10:41.370 回答