1

我对 Flink 的行为有疑问。下面是我的代码片段。如您所见,某些服务正在提供 Flink 将要执行的 sql 条件列表(例如大约 10k sql)。我的问题是,每当更新 sql 时,我如何指示 flink 使用新的 sql?我看到的一种方法是停止和启动我想避免的 flink 服务,因为其他 sql 条件需要一直运行,并且只有正在更新的条件需要动态停止/启动/或更新。另外,我不想将 10k sqls 作为 10k 个不同的作业提交。那么我正在寻找的行为是否可以使用 Flink 1.11 版?

env is StreamExecutionEnvironment... 

Psudo-code:

List<String> allConditionsSqls = get_SQL_FROM_some_Service();
for(String sql : allConditionsSqls)
{
    Table table = env.sqlQuery(sql);
    env.toRetractStream(table, Row.class)
     .process(new ProcessFunction <Tuple2<Boolean, Row>, Object>() {
         @Override
         public void processElement(Tuple2<Boolean, Row> value, Context ctx,Collector<Object> out) throws Exception {
             Row ev = value.f1;
             log.info(ev);
             // more code here
         }    
     });
}
4

1 回答 1

1

不,这样做的唯一方法是将每个查询作为单独的作业运行。(对于它的价值,有些人每天动态生成 10000 个 Flink 作业——这是可以做到的。)

于 2020-11-12T11:53:58.773 回答