2

我想首先使用数据集 API 操作静态数据,然后使用 DataStream API 运行流式作业。如果我在 IDE 上编写代码,它可以完美运行。但是当我尝试在本地 flink jobmanager 上运行(所有并行度为 1)时,流代码永远不会执行!

例如,以下代码不起作用:

val parallelism = 1

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(parallelism)

val envStatic = ExecutionEnvironment.getExecutionEnvironment
envStatic.setParallelism(parallelism)

val myStaticData = envStatic.fromCollection(1 to 10)
val myVal: Int = myStaticData.reduce(_ + _).collect().head

val theStream = env.fromElements(1).iterate( iteretion => {
  val result = iteretion.map(x => x + myVal)
  (result, result)
})
theStream.print()
env.execute("static and streaming together")

我应该怎么做才能让这件事发挥作用?

日志:上述程序的执行日志

执行计划:plan 似乎是非循环的。

4

1 回答 1

4

如果您的 Flink 作业包含多个子作业,例如由或触发count,则您无法通过 Web 界面提交作业。Web 界面仅支持单个 Flink 作业。collectprint

于 2016-04-12T14:44:04.540 回答