问题标签 [flink-sql]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-flink - 表 API Scala
我正在尝试使用 flink scala table api 加入两个表。我有一个带有两个(a,b)列的表和另一个带有一个(c)列的表我想将这两个表连接到一个具有三个(a,b,c)列的更大的表中。我只是想加入他们我不想使用任何条件(Where 子句)加入他们。但是Flink给我一个使用Where子句的错误,这是一种在where子句中没有任何条件的加入表的方法吗?如果我想使用 where 子句,我应该给出什么条件?
下面是我加入两个表的命令
对正确方向的任何帮助都将受到高度赞赏。谢谢你。
scala - Flink Table API & SQL 和地图类型 (Scala)
我在流环境中使用 Flink 的 Table API 和/或 Flink 的 SQL 支持(Flink 1.3.1、Scala 2.11)。我从 , 开始DataStream[Person]
,Person
是一个看起来像这样的案例类:
一切都按预期工作,直到我开始attributes
进入画面。
例如:
... 导致:
线程“主”org.apache.flink.table.api.TableException 中的异常:不支持类型:org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) 处的任何 org.apache .flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341) 在 org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:531) 在 org.apache。 flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$ $anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection .IterableLike$ 类。foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable .scala:104) at org.apache.flink.table.plan.logical.LogicalRelNode.(operators.scala:530) at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) at com .nordstrom.mdt.Job$.main(Job.scala:112) 在 com.nordstrom.mdt.Job.main(Job.scala)flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) 在 com.nordstrom.mdt.Job$.main(Job.scala:112) 在 com.nordstrom.mdt.Job.main(Job.scala)flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) 在 com.nordstrom.mdt.Job$.main(Job.scala:112) 在 com.nordstrom.mdt.Job.main(Job.scala)
注意:无论是否存在特定的映射键,都会发生此错误。另请注意,如果我根本没有指定映射键,我会得到一个不同的错误,这是有道理的;这种情况在这里没有发挥作用。
这个 PR似乎说有一条前进的道路:https ://github.com/apache/flink/pull/3767 。特别看一下测试用例,它表明数据集可以提供类型信息。没有相关的方法fromDataStream
,并registerDataStream
提供了一种提供类型信息的方法。
这可能吗?换句话说,Flink SQL on Streams 能支持 map 吗?
澄清编辑...
省略地图键(而不是)时,我收到以下错误。这表明运行时确实知道这些是字符串。GROUP BY ... attributes
attributes['foo']
此类型(接口 scala.collection.immutable.Map[scala.Tuple2(_1: String, _2: String)])不能用作键。
apache-flink - Flink SQL 是否支持 Java Map 类型?
我正在尝试使用 Flink 的 SQL API 从地图中访问密钥。它失败并出现错误 Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY 请告诉我如何修复它。这是我的活动课
这是提交 flink 作业的主类
当我运行它时,我得到了异常
我正在使用 flink 1.3.1
scala - 例外“原子类型的表只能有一个字段。” 将 DataStream 转换为表时
1、BillCount 和 Record 是类对象。BillCount 对象的列是一些记录的。
2、Flink source 正在从 kafka topic 获取 'Record' 数据。
以下是例外:
left-join - Apache Flink:带有 TableFunction 的 LEFT JOIN 未返回预期结果
Flink 版本:1.3.1
我创建了两个表,一个来自内存,另一个来自 UDTF。当我测试加入和离开加入时,它们返回了相同的结果。我所期望的是左连接比连接有更多的行。
我的测试代码是这样的:
左连接和连接查询的输出是一样的。在这两种情况下,只返回一行。
WCUper Ciao 1 CIAO
但是,我认为左连接查询应该保留“Hello”行。
java - Apache Flink Table 1.4:可以在 Table 上执行外部 SQL 吗?
是否可以在不上传 .jar 的情况下从外部查询现有 StreamTable 获取执行环境并检索表环境?我一直在等待 Apache Flink Table 1.4 发布,因为它具有动态(连续)表功能。我期待别的东西,我认为可以在运行时更改表并修改其参数。为了进行一些实时查询,而不是在数据流之上定义(连续或仅附加)数据库视图。我知道我可以将表导出到某个数据库并使用 SQL 动态查询该数据库,但这是一种尴尬的行为。Flink 的美妙之处在于一切都是实时的,一切都是流,那么是否可以通过一些外部程序实时查询 Flink 表呢?
apache-flink - Apache Flink:如何使用 Table API 查询关系数据库?
以下代码片段摘自此博客文章:
我想从关系数据库中读取数据。Flink 有TableSource
JDBC 数据库吗?
java - 尝试使用 Table API 对数据进行分组时出现 Flink RuntimeException
我的项目打算使用 Flink 进行数据处理。我们有一个 Kafka 主题,使用该主题我们想使用 Flink 聚合数据。以下是我的示例代码
由于 group by 标准的动态特性,我打算使用 Flink 的 Table API。当上面的代码运行时,我得到以下错误。
嵌套异常是java.lang.RuntimeException
编译错误:
我不知道这里发生了什么。任何帮助表示赞赏。
这是我的依赖项
apache-flink - 在任何表中都找不到 Flink 1.4 列“rowtime”
我正在按照文档配置具有 rowtime 属性的 TableSource。
我注册timestamp
字段如下
并得到以下错误:
线程“main”org.apache.flink.table.api.ValidationException 中的异常:SQL 验证失败。从第 1 行第 64 列到第 1 行第 70 列:在 org.apache.flink 的 org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93) 的任何表中都找不到列 'rowtime'。 table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:561) at oracle.flink.demo.KafkaSQLStream.main(KafkaSQLStream.java:62) 原因:org.apache.calcite.runtime.CalciteContextException:从第 1 行第 64 列开始到第 1 行第 70 列:在 sun.reflect.DelegatingConstructorAccessorImpl 的 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 的 sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 的任何表中找不到列“rowtime”。
apache-flink - Apache Flink:推断 CSV 文件的架构,如 Spark
在 Spark 中,我们可以使用推断模式从文件中动态读取模式,例如:
有没有办法在 Flink 中做同样的事情?