我在流环境中使用 Flink 的 Table API 和/或 Flink 的 SQL 支持(Flink 1.3.1、Scala 2.11)。我从 , 开始DataStream[Person]
,Person
是一个看起来像这样的案例类:
Person(name: String, age: Int, attributes: Map[String, String])
一切都按预期工作,直到我开始attributes
进入画面。
例如:
val result = streamTableEnvironment.sql(
"""
|SELECT
|name,
|attributes['foo'],
|TUMBLE_START(rowtime, INTERVAL '1' MINUTE)
|FROM myTable
|GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE), name, attributes['foo']
|""".stripMargin)
... 导致:
线程“主”org.apache.flink.table.api.TableException 中的异常:不支持类型:org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) 处的任何 org.apache .flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341) 在 org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:531) 在 org.apache。 flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$ $anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection .IterableLike$ 类。foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable .scala:104) at org.apache.flink.table.plan.logical.LogicalRelNode.(operators.scala:530) at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) at com .nordstrom.mdt.Job$.main(Job.scala:112) 在 com.nordstrom.mdt.Job.main(Job.scala)flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) 在 com.nordstrom.mdt.Job$.main(Job.scala:112) 在 com.nordstrom.mdt.Job.main(Job.scala)flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) 在 com.nordstrom.mdt.Job$.main(Job.scala:112) 在 com.nordstrom.mdt.Job.main(Job.scala)
注意:无论是否存在特定的映射键,都会发生此错误。另请注意,如果我根本没有指定映射键,我会得到一个不同的错误,这是有道理的;这种情况在这里没有发挥作用。
这个 PR似乎说有一条前进的道路:https ://github.com/apache/flink/pull/3767 。特别看一下测试用例,它表明数据集可以提供类型信息。没有相关的方法fromDataStream
,并registerDataStream
提供了一种提供类型信息的方法。
这可能吗?换句话说,Flink SQL on Streams 能支持 map 吗?
澄清编辑...
省略地图键(而不是)时,我收到以下错误。这表明运行时确实知道这些是字符串。GROUP BY ... attributes
attributes['foo']
此类型(接口 scala.collection.immutable.Map[scala.Tuple2(_1: String, _2: String)])不能用作键。