0

我正在处理数据库突变流,即更改日志流。我希望能够使用 SQL 查询来转换值。我很难将以下三个概念 RowTypeInfo,RowDataStream.

注意:我事先不知道架构。我使用对象中的数据即时构建它MutationMutation是自定义类型)

更具体地说,我有看起来像这样的代码。

val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(execEnv)

// Mutation is a custom type
val mutationStream: DataStream[Mutation] = ...
// toRows returns an object of type org.apache.flink.types.Row
val rowStream:DataStream[Row] = mutationStream.flatMap({mutation => toRows(mutation)})
tableEnv.registerDataStream("spinal_tap_table", rowStream)
tableEnv.sql("select col1 + 2")

注意:Row对象是位置的,并且没有用于列名的占位符。我找不到将架构附加到DataStream对象的位置。

我想传递某种类似于Row包含{columnName: String, columnValue: Object, columnType: TypeInformation[_]}查询完整信息的结构。

4

1 回答 1

2

在 Flink SQL 中,表模式在定义时是强制性的Table。无法对动态类型的记录运行查询。

关于 和的概念RowTypeInfoRowDataStream

  • Row是保存数据的实际记录
  • RowTypeInfoRows 的模式描述。它包含名称和TypeInformationa 的每个字段Row
  • DataStream是记录的逻辑流。ADataStream[Row]是行流。请注意,这不是实际的流,而只是在 API 中表示流的 API 概念。
于 2018-02-09T08:33:35.003 回答