最近,作为科学研究的一部分,我一直在开发一个应用程序,它使用 Travis CI 和 GitHub 的 REST API 流式传输(或至少应该)数据。这样做的目的是深入了解提交-构建关系,以便进一步执行大量分析。
为此,我实现了以下 Travis 自定义接收器:
object TravisUtils {
def createStream(ctx : StreamingContext, storageLevel: StorageLevel) : ReceiverInputDStream[Build] = new TravisInputDStream(ctx, storageLevel)
}
private[streaming]
class TravisInputDStream(ctx : StreamingContext, storageLevel : StorageLevel) extends ReceiverInputDStream[Build](ctx) {
def getReceiver() : Receiver[Build] = new TravisReceiver(storageLevel)
}
private[streaming]
class TravisReceiver(storageLevel: StorageLevel) extends Receiver[Build](storageLevel) with Logging {
def onStart() : Unit = {
new BuildStream().addListener(new BuildListener {
override def onBuildsReceived(numberOfBuilds: Int): Unit = {
}
override def onBuildRepositoryReceived(build: Build): Unit = {
store(build)
}
override def onException(e: Exception): Unit = {
reportError("Exception while streaming travis", e)
}
})
}
def onStop() : Unit = {
}
}
而接收器使用我定制的 TRAVIS API 库(使用 Apache Async 客户端在 Java 中开发)。但是,问题如下:我应该收到的数据是连续的并且不断变化,即不断地推送到 Travis 和 GitHub。例如,考虑 GitHub 每秒大约记录的事实。350 个事件 - 包括推送事件、提交评论和类似事件。
但是,当流式传输 GitHub 或 Travis 时,我确实从前两批中获取数据,但随后,DStream 的 RDD 部分为空 - 尽管有数据要流式传输!
到目前为止,我已经检查了几件事,包括用于省略对 API 的请求的 HttpClient,但它们都没有真正解决这个问题。
因此,我的问题是 - 会发生什么?为什么在周期 x 过去后 Spark 不流式传输数据。您可以在下面找到设置的上下文和配置:
val configuration = new SparkConf().setAppName("StreamingSoftwareAnalytics").setMaster("local[2]")
val ctx = new StreamingContext(configuration, Seconds(3))
val stream = GitHubUtils.createStream(ctx, StorageLevel.MEMORY_AND_DISK_SER)
// RDD IS EMPTY - that is what is happenning!
stream.window(Seconds(9)).foreachRDD(rdd => {
if (rdd.isEmpty()) {println("RDD IS EMPTY")} else {rdd.collect().foreach(event => println(event.getRepo.getName + " " + event.getId))}
})
ctx.start()
ctx.awaitTermination()
提前致谢!