1

我在环境中使用 Flink 的 Table API 和/或 Flink 的 SQL 支持(Flink 1.3.1、Scala 2.11)。我从 , 开始DataStream[Person]Person是一个看起来像这样的案例类:

Person(name: String, age: Int, attributes: Map[String, String])

一切都按预期工作,直到我开始attributes进入画面。

例如:

val result = streamTableEnvironment.sql(
"""
|SELECT
|name,
|attributes['foo'],
|TUMBLE_START(rowtime, INTERVAL '1' MINUTE)
|FROM myTable
|GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE), name, attributes['foo']
|""".stripMargin)

... 导致:

线程“主”org.apache.flink.table.api.TableException 中的异常:不支持类型:org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) 处的任何 org.apache .flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341) 在 org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:531) 在 org.apache。 flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$ $anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Ite​​rator.scala:893) at scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1336) at scala.collection .IterableLike$ 类。foreach(Ite​​rableLike.scala:72) at scala.collection.AbstractIterable.foreach(Ite​​rable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable .scala:104) at org.apache.flink.table.plan.logical.LogicalRelNode.(operators.scala:530) at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) at com .nordstrom.mdt.Job$.main(Job.scala:112) 在 com.nordstrom.mdt.Job.main(Job.scala)flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) 在 com.nordstrom.mdt.Job$.main(Job.scala:112) 在 com.nordstrom.mdt.Job.main(Job.scala)flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) 在 com.nordstrom.mdt.Job$.main(Job.scala:112) 在 com.nordstrom.mdt.Job.main(Job.scala)

注意:无论是否存在特定的映射键,都会发生此错误。另请注意,如果我根本没有指定映射键,我会得到一个不同的错误,这是有道理的;这种情况在这里没有发挥作用。

这个 PR似乎说有一条前进的道路:https ://github.com/apache/flink/pull/3767 。特别看一下测试用例,它表明数据集可以提供类型信息。没有相关的方法fromDataStream,并registerDataStream提供了一种提供类型信息的方法。

可能吗?换句话说,Flink SQL on Streams 能支持 map 吗?

澄清编辑... 省略地图键(而不是),我收到以下错误。这表明运行时确实知道这些是字符串。GROUP BY ... attributesattributes['foo']

此类型(接口 scala.collection.immutable.Map[scala.Tuple2(_1: String, _2: String)])不能用作键。

4

1 回答 1

1

目前,Flink SQL 仅支持 Java java.util.Map。Scala 映射被视为具有 Flink GenericTypeInfo/SQLANY数据类型的黑盒。因此,您可以转发这些黑盒并在标量函数中使用它们,但['key']不支持使用运算符访问。

因此,要么使用 Java 映射,要么自己在 UDF 中实现访问操作。

我为您的问题创建了一个问题:https ://issues.apache.org/jira/browse/FLINK-7360

于 2017-08-03T08:07:50.850 回答