为了降低代码的难度,我允许重启 Spark Streaming 系统以使用新的批大小,但需要保持之前的进度(允许丢失正在处理的批)。
如果我checkpoint
在 Spark Streaming 中使用,它在应用程序重新启动时无法更改所有配置。
所以想通过修改源码来实现这个功能,但是不知道从何下手。希望给点指导,告诉我困难。
为了降低代码的难度,我允许重启 Spark Streaming 系统以使用新的批大小,但需要保持之前的进度(允许丢失正在处理的批)。
如果我checkpoint
在 Spark Streaming 中使用,它在应用程序重新启动时无法更改所有配置。
所以想通过修改源码来实现这个功能,但是不知道从何下手。希望给点指导,告诉我困难。
由于您在谈论批量大小,我假设您在询问火花流而不是结构化流。
有一种方法可以以编程方式设置批处理间隔的值,请参阅此链接以获取文档。
的构造函数StreamingContext
接受duration
定义批处理间隔的类的对象。
您可以通过在代码中硬编码来传递批处理间隔大小,这将要求您每次需要更改批处理间隔时都构建 jar 文件,相反,您可以从配置文件中获取它,这样您就不需要每次都构建代码。
注意:您必须在应用程序的配置文件中设置此属性,而不是在 spark 的配置文件中。
您可以更改批处理间隔的配置并重新启动应用程序,这不会对检查点造成任何问题。
val sparkConf: SparkConf = new SparkConf()
.setAppName("app-name")
.setMaster("app-master")
val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(config.getInt("batch-interval")))
干杯!!