问题标签 [flink-sql]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
274 浏览

apache-flink - Flink SQL 中跳跃窗口上的指数衰减移动平均值:铸造时间

现在我们在 Flink 中有了带有花哨窗口的 SQL,我试图让衰减的移动平均线被“在未来的 Flink 版本中对于 Table API 和 SQL 都可能发生的事情”引用。来自他们的SQL 路线图/预览 2017-03 帖子

这是我的尝试(也受到方解石衰变示例的启发):

时间是处理时间,我们通过从 AppendStream 表创建 write_position 获得的 proctime 为:

我收到此错误:

我已经尝试将 proctime 转换为我所知道的所有其他类型(试图达到 NUMERIC 的应许之地),但我只是找不到如何使它工作。

我错过了什么吗?proctime 是一种您无法转换的非常特殊的“系统更改编号”时间吗?如果是这样,仍然必须有某种方法将其与 HOP_START(proctime,...) 值进行比较。

0 投票
1 回答
303 浏览

apache-flink - 在 Flink 的聚合原语中具有等效于 HOP_START

我试图在 Flink SQL 的跳跃窗口上做一个指数衰减的移动平均线。我需要有权访问窗口的边界之一,即以下 HOP_START:

我得到以下堆栈跟踪:

它确实说它在聚合 SUM 之外工作时是否未实现。所以这就是让我认为这是一个范围界定问题的原因。

现在,问题是:我可以转换这个表达式并在聚合之外进行最终处理,如 exp(x+y) = exp(x)*exp(y); 但我坚持使用 TIMESTAMPDIFF (这在我的上一期中创造了奇迹)。我还没有找到将 TIME ATTRIBUTE 转换为 NUMERIC 类型的方法;此外,即使我缩小它们,我也不喜欢对 UNIX 时间戳求幂。

无论如何,这种解决方法会有点笨拙,我可能还有另一种方法。我不知道如何在这个 SQL 片段中按摩作用域,使其仍然“处于”窗口作用域中,并且在不抛出的情况下获得开始时间。

0 投票
1 回答
159 浏览

apache-flink - 使用 Flink 处理过去 10 分钟内的 kafka 消息?

我们正在考虑使用 Flink SQL 对过去 5-10 分钟内的实时 kafka 数据进行临时分析。为了实现这一点,我们似乎需要扩展 Kafka 连接器,使其仅在给定时间段内读取消息,并使用它来生成有限输入源。

我想知道是否有替代方法。任何建议都将受到欢迎。

0 投票
3 回答
1043 浏览

apache-flink - 为什么我们在 flink 源代码中有 flink-streaming-java 和 flink-streaming-scala 模块

在 Fink 源码中,有flink-stream-javaflink-stream-scala模块。为什么我们需要两个模块来进行 flink 流式传输?

https://github.com/apache/flink/tree/master/flink-streaming-java

https://github.com/apache/flink/tree/master/flink-streaming-scala

0 投票
1 回答
1241 浏览

apache-flink - 如何使用 Flink SQL 按事件时间对流进行排序

我有一个DataStream<Event>要排序的乱序,以便事件按事件时间时间戳排序。我已将我的用例简化为我的 Event 类只有一个字段——该timestamp字段:

我收到此错误:

线程“主”org.apache.flink.table.api.SqlParserException 中的异常:SQL 解析失败。在第 1 行第 8 列遇到“timestamp FROM”。

似乎我没有以正确的方式指定事件时间属性,但目前尚不清楚出了什么问题。

0 投票
1 回答
922 浏览

java - 无法将具有单引号内的值的参数提交给 flink 作业

我正在尝试使用 args 提交我的 jar。我正在使用 flink Rest Api 以 json 格式发送我的 args。我在java中的输入样本是

当我准确给出这些参数时,我的工作通过 IDE 运行,但是当我通过 rest api 发送它时,我的查询 arg 没有单引号。因此我得到 Calcite sql 解析异常。

我该如何解决这个问题?

0 投票
1 回答
275 浏览

apache-flink - 为什么我的 Flink SQL 查询有非常不同的检查点大小?

在我的项目中使用 Flink Table SQL 时,我发现如果GROUP BY我的 SQL 中有任何子句,检查点的大小会大大增加。

例如,

检查点大小将小于 500KB。

但是这样使用的时候,

即使没有处理任何消息,检查点大小也会超​​过 70MB。像这样,

图片在这里。

但是当使用 DataStream API 而keyBy不是 Table SQLGROUP BY时,检查点的大小是正常的,小于 1MB。

为什么?

--------更新于2019-03-25--------

在做了一些测试和阅读源码后,我们发现这是 RocksDB 的原因。

当使用 RockDB 作为 state backend 时,checkpoint 的大小会超过每个 key 5MB 左右,而当使用 filesystem 作为 state backend 时,checkpoint 的大小会下降到每个 key 不到 100KB。

为什么 RocksDB 需要这么大的空间来保存状态?我们什么时候应该选择 RocksDB?

0 投票
1 回答
115 浏览

apache-flink - 如何在flink sql中检查字符串是否为数字

在flink sql中,如何判断一个字符串是否为数字,如

正则表达式似乎没有用,“类似于”运算符也不起作用。有什么想法吗?

0 投票
1 回答
721 浏览

apache-flink - 如何在 Flink Table API 中为算子添加 uid?

正如文档强烈推荐的那样,我想在 Flink 中为我的操作符添加 uid 以用于保存点。我的工作使用 Table API。我在文档中没有找到如何使用 SQL 查询将 uid 添加到运算符。

我的代码看起来像这样:

如果我的理解是正确的,TUMBLE Window 是一种内部操作状态。因此,我想为它分配一个特定的 uid,以防止自动生成的 id 可能引起的一些问题。这样做的正确方法是什么?

我正在运行 Flink v1.6.2

0 投票
1 回答
49 浏览

scala - 在 Java/Scala 程序中从 DataStream 创建 SQL 表并从 SQL 客户端 CLI 查询它 - Apache Flink

是否可以使用 Flink SQL 客户端 CLI 与表交互,其中哪个表是在集群中运行的 Scala/Java 程序中创建的?