0

最近,作为科学研究的一部分,我一直在开发一个应用程序,它使用 Travis CI 和 GitHub 的 REST API 流式传输(或至少应该)数据。这样做的目的是深入了解提交-构建关系,以便进一步执行大量分析。

为此,我实现了以下 Travis 自定义接收器:

object TravisUtils { 

  def createStream(ctx : StreamingContext, storageLevel: StorageLevel) : ReceiverInputDStream[Build] = new TravisInputDStream(ctx, storageLevel) 

} 

private[streaming] 
class TravisInputDStream(ctx : StreamingContext, storageLevel : StorageLevel) extends ReceiverInputDStream[Build](ctx) { 

  def getReceiver() : Receiver[Build] = new TravisReceiver(storageLevel) 

} 

private[streaming] 
class TravisReceiver(storageLevel: StorageLevel) extends Receiver[Build](storageLevel) with Logging { 

  def onStart() : Unit = { 
     new BuildStream().addListener(new BuildListener { 

       override def onBuildsReceived(numberOfBuilds: Int): Unit = { 

       } 

       override def onBuildRepositoryReceived(build: Build): Unit = { 
         store(build) 
       } 

       override def onException(e: Exception): Unit = { 
         reportError("Exception while streaming travis", e) 
       } 
    }) 
  } 

  def onStop() : Unit = { 

  } 
} 

而接收器使用我定制的 TRAVIS API 库(使用 Apache Async 客户端在 Java 中开发)。但是,问题如下:我应该收到的数据是连续的并且不断变化,即不断地推送到 Travis 和 GitHub。例如,考虑 GitHub 每秒大约记录的事实。350 个事件 - 包括推送事件、提交评论和类似事件。

但是,当流式传输 GitHub 或 Travis 时,我确实从前两批中获取数据,但随后,DStream 的 RDD 部分为空 - 尽管有数据要流式传输!

到目前为止,我已经检查了几件事,包括用于省略对 API 的请求的 HttpClient,但它们都没有真正解决这个问题。

因此,我的问题是 - 会发生什么?为什么在周期 x 过去后 Spark 不流式传输数据。您可以在下面找到设置的上下文和配置:

val configuration = new SparkConf().setAppName("StreamingSoftwareAnalytics").setMaster("local[2]") 

val ctx = new StreamingContext(configuration, Seconds(3)) 

val stream = GitHubUtils.createStream(ctx, StorageLevel.MEMORY_AND_DISK_SER) 

// RDD IS EMPTY - that is what is happenning! 
stream.window(Seconds(9)).foreachRDD(rdd => { 
  if (rdd.isEmpty()) {println("RDD IS EMPTY")} else {rdd.collect().foreach(event => println(event.getRepo.getName + " " + event.getId))} 
}) 

ctx.start() 
ctx.awaitTermination() 

提前致谢!

4

0 回答 0