问题标签 [flink-sql]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
219 浏览

apache-flink - 表 API Scala

我正在尝试使用 flink scala table api 加入两个表。我有一个带有两个(a,b)列的表和另一个带有一个(c)列的表我想将这两个表连接到一个具有三个(a,b,c)列的更大的表中。我只是想加入他们我不想使用任何条件(Where 子句)加入他们。但是Flink给我一个使用Where子句的错误,这是一种在where子句中没有任何条件的加入表的方法吗?如果我想使用 where 子句,我应该给出什么条件?

下面是我加入两个表的命令

对正确方向的任何帮助都将受到高度赞赏。谢谢你。

0 投票
1 回答
1401 浏览

scala - Flink Table API & SQL 和地图类型 (Scala)

我在环境中使用 Flink 的 Table API 和/或 Flink 的 SQL 支持(Flink 1.3.1、Scala 2.11)。我从 , 开始DataStream[Person]Person是一个看起来像这样的案例类:

一切都按预期工作,直到我开始attributes进入画面。

例如:

... 导致:

线程“主”org.apache.flink.table.api.TableException 中的异常:不支持类型:org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) 处的任何 org.apache .flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341) 在 org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:531) 在 org.apache。 flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$ $anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Ite​​rator.scala:893) at scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1336) at scala.collection .IterableLike$ 类。foreach(Ite​​rableLike.scala:72) at scala.collection.AbstractIterable.foreach(Ite​​rable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable .scala:104) at org.apache.flink.table.plan.logical.LogicalRelNode.(operators.scala:530) at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) at com .nordstrom.mdt.Job$.main(Job.scala:112) 在 com.nordstrom.mdt.Job.main(Job.scala)flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) 在 com.nordstrom.mdt.Job$.main(Job.scala:112) 在 com.nordstrom.mdt.Job.main(Job.scala)flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) 在 com.nordstrom.mdt.Job$.main(Job.scala:112) 在 com.nordstrom.mdt.Job.main(Job.scala)

注意:无论是否存在特定的映射键,都会发生此错误。另请注意,如果我根本没有指定映射键,我会得到一个不同的错误,这是有道理的;这种情况在这里没有发挥作用。

这个 PR似乎说有一条前进的道路:https ://github.com/apache/flink/pull/3767 。特别看一下测试用例,它表明数据集可以提供类型信息。没有相关的方法fromDataStream,并registerDataStream提供了一种提供类型信息的方法。

可能吗?换句话说,Flink SQL on Streams 能支持 map 吗?

澄清编辑... 省略地图键(而不是),我收到以下错误。这表明运行时确实知道这些是字符串。GROUP BY ... attributesattributes['foo']

此类型(接口 scala.collection.immutable.Map[scala.Tuple2(_1: String, _2: String)])不能用作键。

0 投票
2 回答
1941 浏览

apache-flink - Flink SQL 是否支持 Java Map 类型?

我正在尝试使用 Flink 的 SQL API 从地图中访问密钥。它失败并出现错误 Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY 请告诉我如何修复它。这是我的活动课

这是提交 flink 作业的主类

当我运行它时,我得到了异常

我正在使用 flink 1.3.1

0 投票
1 回答
1033 浏览

scala - 例外“原子类型的表只能有一个字段。” 将 DataStream 转换为表时

1、BillCount 和 Record 是类对象。BillCount 对象的列是一些记录的。
2、Flink source 正在从 kafka topic 获取 'Record' 数据。

以下是例外:

0 投票
1 回答
650 浏览

left-join - Apache Flink:带有 TableFunction 的 LEFT JOIN 未返回预期结果

Flink 版本:1.3.1

我创建了两个表,一个来自内存,另一个来自 UDTF。当我测试加入和离开加入时,它们返回了相同的结果。我所期望的是左连接比连接有更多的行。

我的测试代码是这样的:

左连接和连接查询的输出是一样的。在这两种情况下,只返回一行。

WCUper Ciao 1 CIAO

但是,我认为左连接查询应该保留“Hello”行。

0 投票
1 回答
375 浏览

java - Apache Flink Table 1.4:可以在 Table 上执行外部 SQL 吗?

是否可以在不上传 .jar 的情况下从外部查询现有 StreamTable 获取执行环境并检索表环境?我一直在等待 Apache Flink Table 1.4 发布,因为它具有动态(连续)表功能。我期待别的东西,我认为可以在运行时更改表并修改其参数。为了进行一些实时查询,而不是在数据流之上定义(连续或仅附加)数据库视图。我知道我可以将表导出到某个数据库并使用 SQL 动态查询该数据库,但这是一种尴尬的行为。Flink 的美妙之处在于一切都是实时的,一切都是流,那么是否可以通过一些外部程序实时查询 Flink 表呢?

0 投票
1 回答
1666 浏览

apache-flink - Apache Flink:如何使用 Table API 查询关系数据库?

以下代码片段摘自此博客文章

我想从关系数据库中读取数据。Flink 有TableSourceJDBC 数据库吗?

0 投票
0 回答
618 浏览

java - 尝试使用 Table API 对数据进行分组时出现 Flink RuntimeException

我的项目打算使用 Flink 进行数据处理。我们有一个 Kafka 主题,使用该主题我们想使用 Flink 聚合数据。以下是我的示例代码

由于 group by 标准的动态特性,我打算使用 Flink 的 Table API。当上面的代码运行时,我得到以下错误。

嵌套异常是java.lang.RuntimeException

编译错误:

我不知道这里发生了什么。任何帮助表示赞赏。

这是我的依赖项

0 投票
1 回答
1287 浏览

apache-flink - 在任何表中都找不到 Flink 1.4 列“rowtime”

我正在按照文档配置具有 rowtime 属性的 TableSource

我注册timestamp字段如下

并得到以下错误:

线程“main”org.apache.flink.table.api.ValidationException 中的异常:SQL 验证失败。从第 1 行第 64 列到第 1 行第 70 列:在 org.apache.flink 的 org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93) 的任何表中都找不到列 'rowtime'。 table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:561) at oracle.flink.demo.KafkaSQLStream.main(KafkaSQLStream.java:62) 原因:org.apache.calcite.runtime.CalciteContextException:从第 1 行第 64 列开始到第 1 行第 70 列:在 sun.reflect.DelegatingConstructorAccessorImpl 的 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 的 sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 的任何表中找不到列“rowtime”。

0 投票
1 回答
513 浏览

apache-flink - Apache Flink:推断 CSV 文件的架构,如 Spark

在 Spark 中,我们可以使用推断模式从文件中动态读取模式,例如:

有没有办法在 Flink 中做同样的事情?