问题标签 [flink-sql]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-flink - Flink Stream SQL order by
我有一个流式输入,比如股票价格数据(包括多只股票),我想每 1 分钟按价格进行一次排名。排名基于所有股票的最新价格,无论前1分钟是否更新,都需要对所有股票进行排序。我尝试在 flink 流 SQL 中使用 ORDER BY。
我未能实现我的逻辑,我对两个部分感到困惑:
为什么
ORDER BY
只能使用时间属性作为主要且仅支持ASC
?如何按价格等其他类型执行订单?下面的 SQL(来自 Flink 文档)是什么意思?没有窗口也没有窗口,所以我假设每个订单都会立即执行 SQL,在这种情况下,对一个元素进行排序看起来毫无意义。
[更新]:当我阅读 ProcimeSortProcessFunction.scala 的代码时,似乎 Flink 对接下来一毫秒内收到的元素进行了排序。
最后,有没有办法在 SQL 中实现我的逻辑?
apache-flink - 在 Apache Flink 中注册聚合 UDF
我正在尝试按照此处的步骤创建基本的 Flink Aggregate UDF。我已经添加了依赖项()并实现了
我已经实现了强制方法以及其他一些方法:accumulate, merge, etc
. 所有这些构建都没有错误。现在根据文档,我应该可以将其注册为
但是,registerFucntion
似乎ScalarFunction
只需要一个作为输入。我收到不兼容的类型错误:The method registerFunction(String, ScalarFunction) in the type TableEnvironment is not applicable for the arguments (String, MyAggregate)
任何帮助都会很棒。
json - 如何使用 flink 流式传输 json?
我实际上正在处理一个流,接收一堆字符串,需要计算所有字符串。总和是加总的,这意味着对于第二条记录,总和被添加到输出前一天必须是一些看起来像的 json 文件
我创建了一个看起来像这样的流:
提前感谢您的帮助:)
apache-flink - Flink SQL 中的控制流
使用流 API,我可以编写一个 RichCoFlatMapFunction 接受控制流和数据流,控制流包含用于启动或停止或更改计算参数的元素,我知道我可以将当前控制设置存储在状态中,并且处理数据流时检查值。
但是用 Flink SQL 做类似事情的方法是什么?我不能将连接用作数据流,并且控制流无法连接在一起。
我们提出的解决方案是通过应用程序本身存储控制设置。这个想法是:
将控制流广播到 map 运算符,并将控制设置存储到其 map() 方法中的 java 单例对象中,因为 map 运算符将以默认并行度运行,我们假设它将在该作业的所有 JVM 上运行,这样我们就可以确保每个 JVM 都会初始化并不断更新单例对象中的控制设置。
使用 SQL,对于每个 UDAF 或 UDF,我们可以通过访问 java 单例对象来访问控制设置。
但我不确定我的假设是否正确,这是一个可行的解决方案。
apache-flink - 如何编写基于批处理表增量窗口的计算SQL
我的要求是根据批处理表的增量大小窗口进行计算。
例如,第一个窗口有1行,第二个窗口有2行(包括第一个窗口的1行和一个新行),然后第三个窗口有3行(包括第二个窗口的2行和一个新行) , 等等。
例如:
源表:
日期时间 | 产品编号 | 价格 |
3-1 | p1 | 10 |
3-2 | p1 | 20 |
3-3 | p1 | 30 |
3-4 | p1 | 40 |
结果表:
日期时间 | 产品编号 | 平均|
3-1 | p1 | 10/1 |
3-2 | p1 | (10+20)/2 |
3-3 | p1 | (10+20+30)/3 |
3-4 | p1 | (10+20+30+40)/4 |
我正在尝试找到一种使用 Sql 实现此要求的方法,在我看来,OVER 操作可以做到这一点,但尚未在 flink 中实现,所以我需要另一种方法。
顺便提一句:
我尝试使用 1 天的 TUMBLE 窗口并将先前的值存储在用户定义的聚合对象中,但失败了,因为聚合对象将被所有产品重用,而不是每个产品的单个对象
apache-flink - ArithmeticException when divide const integer in Flink SQL
Flink throwed ArithmeticException(on-terminating decimal expansion no exact representable decimal result) when I executed below query:
I can't find a way from the document or google to get rid of this issue. Thanks.
apache-flink - Apache Flink:运行许多作业时的性能问题
对于大量 Flink SQL 查询(以下 100 条),Flink 命令行客户端在 Yarn 集群上失败并显示“JobManager 在 600000 毫秒内没有响应”,即集群上永远不会启动作业。
- 在最后一个 TaskManager 启动后,JobManager 日志除了“在 JobManager 中找不到 ID 为 5cd95f89ed7a66ec44f2d19eca0592f7 的作业”的 DEBUG 日志之外什么都没有,表明它可能卡住了(创建 ExecutionGraph?)。
- 与本地的独立 java 程序相同(最初是高 CPU)
- 注意: structStream 中的每一行包含 515 列(许多最终为空),其中包括一个包含原始消息的列。
- 在 YARN 集群中,我们为 TaskManager 指定 18GB,为 JobManager 指定 18GB,每个插槽 5 个,并行度为 725(我们的 Kafka 源中的分区)。
Flink SQL 查询:
代码
scala - 使用 Flink 从 Redis 读取数据
我对 Flink 完全陌生。可能会重复这个问题,但只找到一个链接,这对我来说是无法理解的。
https://stackoverflow.com/a/44294980/6904987
我以 Key Value 格式将数据存储在 Redis 中,例如 Key 是 UserId,UserInfo 是 value。写在下面的代码。
样本数据:
12“用户信息12”
13“用户信息13”
14“用户信息14”
15“用户信息15”
我想使用基于 key 的 Flink 从 redis 获取数据。示例 14 应返回“UserInfo14”。输出应该打印在 Flink 日志文件或终端中,无论它是什么。
提前致谢。
apache-flink - Flink SQL - 如何使用自定义模式解析 TIMESTAMP?
从文档看来,Flink 的 SQL 只能解析某种格式的时间戳,即:
TIMESTAMP 字符串:将格式为“yy-mm-dd hh:mm:ss.fff”的时间戳字符串解析为 SQL 时间戳。
有没有办法传入自定义 DateTimeFormatter 来解析不同类型的时间戳格式?
apache-flink - Flink 跨多个主机读取 CSV
我有一个像https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/cluster_setup.html这样的集群,在其中我有多个 CSV 对应于每个主机的分片。我想使用表 API 计算跨多个主机的 CSV 列的总和。每个工作人员都应该能够计算他拥有的 CSV 的总和并将结果返回到主服务器。是否有可能,如果这是我应该实施的。