我们有兴趣从新的 Stateful Functions连接到常规的 Flink Streaming 应用程序,最好使用 Table API。想法是从 Statefun 咨询在 Flink 中注册的表,这可能吗,正确的做法是什么?
到目前为止,我的想法是在一些主函数中初始化我的表流并注册一个有状态的函数提供程序以连接到表:
@AutoService(StatefulFunctionModule.class)
public class Module implements StatefulFunctionModule {
@Override
public void configure(Map<String, String> globalConfiguration, Binder binder) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// ingest a DataStream from an external source
DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
// SQL query with an inlined (unregistered) table
Table myTable = tableEnv.fromDataStream(ds, "user, product, amount");
tableEnv.createTemporaryView("my_table", myTable);
TableFunctionProvider tableProvider = new TableFunctionProvider();
binder.bindFunctionProvider(FnEnrichmentCallback.TYPE, tableProvider);
//continue registering my other messages
//...
}
}
有状态函数提供者将返回 a FnTableQuery
,它在收到消息时简单地查询表:
public class TableFunctionProvider implements StatefulFunctionProvider {
@Override
public StatefulFunction functionOfType(FunctionType type) {
return new FnTableQuery();
}
}
然后查询函数对象将作为每个已建立进程的参与者运行,并在调用时简单地查询表:
public class FnTableQuery extends StatefulMatchFunction {
static final FunctionType TYPE = new FunctionType(Identifiers.NAMESPACE, "my-table");
private Table myTable;
@Override
public void configure(MatchBinder binder) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
myTable = tableEnv.from("my_table");
binder
.otherwise(this::catchAll);
}
private void catchAll(Context context, Object message) {
context.send(FnEnrichmentCallback.TYPE, myTable.select("max(amount)").toString(), message);
}
}
如果这种方法没有意义,我提前道歉,因为我不知道:
Flink 和 Statefun 应用程序可以在源/接收器领域之外一起工作,特别是因为这个特定的功能是无状态的并且表是有状态的
我们可以像这样查询 Flink 表,我只是将它们作为中间对象进行查询以发送到接收器或数据流
在 Module.configure 中初始化事物是有意义的,并且如果每个并行工作者都调用有状态函数提供程序及其匹配函数一次