0

我想将 pureconfig 与 apache Flink 一起使用。

开始工作时如何传递其他 java 属性?

我尝试通过:-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"参数传递它,但它不被接受:

https://github.com/geoHeil/streaming-reference/blob/5-basic-flink-setup/Makefile#L21

flink run --class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
      "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"


-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"

并且主类在尝试从配置文件中实例化配置时失败。

请注意,完整的参考资料可在https://github.com/geoHeil/streaming-reference获得。您可以通过以下方式重现上述错误:

git clone git@github.com:geoHeil/streaming-reference.git
cd streaming-reference
git checkout 5-basic-flink-setup
make run-local-Tweets

并且应该看到以下异常:

ConfigurationException: Failed to start. There is a problem with the configuration: ConfigReaderFailures(ConvertFailure(KeyNotFound(foo,Set()),None,),List())

在 Spark 中,此属性称为:extraJavaOptions

编辑

即,我尝试使用Flink 的方法:How to pass extra JVM options to TaskManager and JobManager,但到目前为止它不适用于当前版本的 Flink (1.10.1)

此属性相当于spark.driver.extraJavaOptionsApache Spark 中的属性。而且我相信,它需要传递给工作经理。

如果我阅读文档-yD,则仅适用于 YARN。但我也需要一些在本地也可以使用的东西。

更多相关文章:

4

1 回答 1

1

从邮件列表中复制答案。

如果您将集群重用于多个作业,则它们需要共享,JVM_ARGS因为它是相同的过程。[1] 在 Spark 上,每个阶段都会产生新进程 afaik。

但是,目前的建议是每个作业/应用程序只使用一个 ad-hoc 集群(这更接近 Spark 的工作方式)。因此,如果您使用 YARN,每个作业/应用程序都会生成一个大小合适的新集群。然后你可以为新的 YARN 提交提供新的参数

flink run -m yarn-cluster -yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'" -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis 
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"

但是,请确保可以从 YARN 集群内访问该路径,因为驱动程序可能在集群上执行(不是 100% 肯定)。

要将文件添加到纱线部署,请使用

 -yt,--yarnship <arg>                 Ship files in the specified directory
                                      (t for transfer)

如果您想在共享集群上进行每个作业级别的配置,我建议使用普通参数并手动初始化 PureConfig(没有使用过,所以不确定如何使用)。然后,您可能会按如下方式调用您的程序。

flink run -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar" config.file='config/jobs/twitter-analysis.conf'

对于本地执行,我在配置它时也遇到了一些麻烦(用您的代码尝试过)。问题是我们之前尝试过的所有参数都只传递给新生成的进程,而您的代码直接在 CLI 中执行。

FLINK_ENV_JAVA_OPTS=-Dconfig.file="`pwd`/config/jobs/twitter-analysis.conf" flink run -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis     "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"

FLINK_ENV_JAVA_OPTS通常使用 env.java.opts 从 flink-conf.yaml 解析,但不尊重-Denv.java.opts. 我不确定这是否是故意的。

如果你可以把它放在env.java.optsflink-conf.yaml 中,它很可能对 YARN 和本地都有效。FLINK_CONF_DIR您可以为每个作业设置不同的 conf 目录。或者,您也可以同时指定FLINK_ENV_JAVA_OPTS-yD注入属性。

于 2020-06-25T19:29:32.220 回答