4

我正在使用火花流来消耗 kafka 消息。我想从 kafka 获取一些消息作为样本,而不是阅读所有消息。所以我想阅读一批消息,将它们返回给调用者并停止火花流。目前我在 spark 流上下文方法的 awaitTermination 方法中传递 batchInterval 时间。我现在不知道如何将处理后的数据从火花流返回给调用者。这是我目前正在使用的代码

def getsample(params: scala.collection.immutable.Map[String, String]): Unit = {
    if (params.contains("zookeeperQourum"))
      zkQuorum = params.get("zookeeperQourum").get
    if (params.contains("userGroup"))
      group = params.get("userGroup").get
    if (params.contains("topics"))
      topics = params.get("topics").get
    if (params.contains("numberOfThreads"))
      numThreads = params.get("numberOfThreads").get
    if (params.contains("sink"))
      sink = params.get("sink").get
    if (params.contains("batchInterval"))
      interval = params.get("batchInterval").get.toInt
    val sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("spark://cloud2-server:7077")
    val ssc = new StreamingContext(sparkConf, Seconds(interval))
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    var consumerConfig = scala.collection.immutable.Map.empty[String, String]
    consumerConfig += ("auto.offset.reset" -> "smallest")
    consumerConfig += ("zookeeper.connect" -> zkQuorum)
    consumerConfig += ("group.id" -> group)
    var data = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, consumerConfig, topicMap, StorageLevel.MEMORY_ONLY).map(_._2)
    val streams = data.window(Seconds(interval), Seconds(interval)).map(x => new String(x))
    streams.foreach(rdd => rdd.foreachPartition(itr => {
      while (itr.hasNext && size >= 0) {
        var msg=itr.next
        println(msg)
        sample.append(msg)
        sample.append("\n")
        size -= 1
      }
    }))
    ssc.start()
    ssc.awaitTermination(5000)
    ssc.stop(true)
  }

因此,我不想将消息保存在名为“sample”的字符串构建器中,而是返回给调用者。

4

2 回答 2

6

你可以实现一个 StreamingListener 然后在里面, onBatchCompleted 你可以调用 ssc.stop()

private class MyJobListener(ssc: StreamingContext) extends StreamingListener {

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {

    ssc.stop(true)

  }

}

这是将 SparkStreaming 附加到 JobListener 的方式:

val listen = new MyJobListener(ssc)
ssc.addStreamingListener(listen)

ssc.start()
ssc.awaitTermination()
于 2015-02-17T10:10:47.840 回答
0

我们可以使用以下代码获取示例消息

var sampleMessages=streams.repartition(1).mapPartitions(x=>x.take(10))

如果我们想在第一批之后停止,那么我们应该实现我们自己的 StreamingListener 接口,并且应该在 onBatchCompleted 方法中停止流式传输。

于 2014-12-04T08:07:26.487 回答