0

为了降低代码的难度,我允许重启 Spark Streaming 系统以使用新的批大小,但需要保持之前的进度(允许丢失正在处理的批)。

如果我checkpoint在 Spark Streaming 中使用,它在应用程序重新启动时无法更改所有配置。

所以想通过修改源码来实现这个功能,但是不知道从何下手。希望给点指导,告诉我困难。

4

1 回答 1

0

由于您在谈论批量大小,我假设您在询问火花流而不是结构化流。

有一种方法可以以编程方式设置批处理间隔的值,请参阅链接以获取文档。

的构造函数StreamingContext接受duration定义批处理间隔的类的对象。

您可以通过在代码中硬编码来传递批处理间隔大小,这将要求您每次需要更改批处理间隔时都构建 jar 文件,相反,您可以从配置文件中获取它,这样您就不需要每次都构建代码。

注意:您必须在应用程序的配置文件中设置此属性,而不是在 spark 的配置文件中。

您可以更改批处理间隔的配置并重新启动应用程序,这不会对检查点造成任何问题。

val sparkConf: SparkConf = new SparkConf()
  .setAppName("app-name")
  .setMaster("app-master")

val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(config.getInt("batch-interval")))

干杯!!

于 2020-07-20T01:36:28.363 回答