我正在处理数据库突变流,即更改日志流。我希望能够使用 SQL 查询来转换值。我很难将以下三个概念
RowTypeInfo
,Row
和DataStream
.
注意:我事先不知道架构。我使用对象中的数据即时构建它Mutation
(Mutation
是自定义类型)
更具体地说,我有看起来像这样的代码。
val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(execEnv)
// Mutation is a custom type
val mutationStream: DataStream[Mutation] = ...
// toRows returns an object of type org.apache.flink.types.Row
val rowStream:DataStream[Row] = mutationStream.flatMap({mutation => toRows(mutation)})
tableEnv.registerDataStream("spinal_tap_table", rowStream)
tableEnv.sql("select col1 + 2")
注意:Row
对象是位置的,并且没有用于列名的占位符。我找不到将架构附加到DataStream
对象的位置。
我想传递某种类似于Row
包含{columnName: String, columnValue: Object, columnType: TypeInformation[_]}
查询完整信息的结构。