问题标签 [flink-sql]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1384 浏览

apache-flink - Flink Stream SQL order by

我有一个流式输入,比如股票价格数据(包括多只股票),我想每 1 分钟按价格进行一次排名。排名基于所有股票的最新价格,无论前1分钟是否更新,都需要对所有股票进行排序。我尝试在 flink 流 SQL 中使用 ORDER BY。

我未能实现我的逻辑,我对两个部分感到困惑:

  1. 为什么ORDER BY只能使用时间属性作为主要且仅支持ASC?如何按价格等其他类型执行订单?

  2. 下面的 SQL(来自 Flink 文档)是什么意思?没有窗口也没有窗口,所以我假设每个订单都会立即执行 SQL,在这种情况下,对一个元素进行排序看起来毫无意义。

[更新]:当我阅读 ProcimeSortProcessFunction.scala 的代码时,似乎 Flink 对接下来一毫秒内收到的元素进行了排序。

最后,有没有办法在 SQL 中实现我的逻辑?

0 投票
1 回答
453 浏览

apache-flink - 在 Apache Flink 中注册聚合 UDF

我正在尝试按照此处的步骤创建基本的 Flink Aggregate UDF。我已经添加了依赖项()并实现了

我已经实现了强制方法以及其他一些方法:accumulate, merge, etc. 所有这些构建都没有错误。现在根据文档,我应该可以将其注册为

但是,registerFucntion似乎ScalarFunction只需要一个作为输入。我收到不兼容的类型错误:The method registerFunction(String, ScalarFunction) in the type TableEnvironment is not applicable for the arguments (String, MyAggregate)

任何帮助都会很棒。

0 投票
1 回答
3645 浏览

json - 如何使用 flink 流式传输 json?

我实际上正在处理一个流,接收一堆字符串,需要计算所有字符串。总和是加总的,这意味着对于第二条记录,总和被添加到输出前一天必须是一些看起来像的 json 文件

我创建了一个看起来像这样的流:

提前感谢您的帮助:)

0 投票
1 回答
310 浏览

apache-flink - Flink SQL 中的控制流

使用流 API,我可以编写一个 RichCoFlatMapFunction 接受控制流和数据流,控制流包含用于启动或停止或更改计算参数的元素,我知道我可以将当​​前控制设置存储在状态中,并且处理数据流时检查值。

但是用 Flink SQL 做类似事情的方法是什么?我不能将连接用作数据流,并且控制流无法连接在一起。

我们提出的解决方案是通过应用程序本身存储控制设置。这个想法是:

  1. 将控制流广播到 map 运算符,并将控制设置存储到其 map() 方法中的 java 单例对象中,因为 map 运算符将以默认并行度运行,我们假设它将在该作业的所有 JVM 上运行,这样我们就可以确保每个 JVM 都会初始化并不断更新单例对象中的控制设置。

  2. 使用 SQL,对于每个 UDAF 或 UDF,我们可以通过访问 java 单例对象来访问控制设置。

但我不确定我的假设是否正确,这是一个可行的解决方案。

0 投票
1 回答
77 浏览

apache-flink - 如何编写基于批处理表增量窗口的计算SQL

我的要求是根据批处理表的增量大小窗口进行计算。

例如,第一个窗口有1行,第二个窗口有2行(包括第一个窗口的1行和一个新行),然后第三个窗口有3行(包括第二个窗口的2行和一个新行) , 等等。

例如:

源表:

日期时间 | 产品编号 | 价格 |

3-1 | p1 | 10 |

3-2 | p1 | 20 |

3-3 | p1 | 30 |

3-4 | p1 | 40 |

结果表:

日期时间 | 产品编号 | 平均|

3-1 | p1 | 10/1 |

3-2 | p1 | (10+20)/2 |

3-3 | p1 | (10+20+30)/3 |

3-4 | p1 | (10+20+30+40)/4 |

我正在尝试找到一种使用 Sql 实现此要求的方法,在我看来,OVER 操作可以做到这一点,但尚未在 flink 中实现,所以我需要另一种方法。

顺便提一句:

我尝试使用 1 天的 TUMBLE 窗口并将先前的值存储在用户定义的聚合对象中,但失败了,因为聚合对象将被所有产品重用,而不是每个产品的单个对象

0 投票
1 回答
112 浏览

apache-flink - ArithmeticException when divide const integer in Flink SQL

Flink throwed ArithmeticException(on-terminating decimal expansion no exact representable decimal result) when I executed below query:

I can't find a way from the document or google to get rid of this issue. Thanks.

0 投票
1 回答
566 浏览

apache-flink - Apache Flink:运行许多作业时的性能问题

对于大量 Flink SQL 查询(以下 100 条),Flink 命令行客户端在 Yarn 集群上失败并显示“JobManager 在 600000 毫秒内没有响应”,即集群上永远不会启动作业。

  • 在最后一个 TaskManager 启动后,JobManager 日志除了“在 JobManager 中找不到 ID 为 5cd95f89ed7a66ec44f2d19eca0592f7 的作业”的 DEBUG 日志之外什么都没有,表明它可能卡住了(创建 ExecutionGraph?)。
  • 与本地的独立 java 程序相同(最初是高 CPU)
  • 注意: structStream 中的每一行包含 515 列(许多最终为空),其中包括一个包含原始消息的列。
  • 在 YARN 集群中,我们为 TaskManager 指定 18GB,为 JobManager 指定 18GB,每个插槽 5 个,并行度为 725(我们的 Kafka 源中的分区)。

Flink SQL 查询:

代码

0 投票
2 回答
1553 浏览

scala - 使用 Flink 从 Redis 读取数据

我对 Flink 完全陌生。可能会重复这个问题,但只找到一个链接,这对我来说是无法理解的。

https://stackoverflow.com/a/44294980/6904987

我以 Key Value 格式将数据存储在 Redis 中,例如 Key 是 UserId,UserInfo 是 value。写在下面的代码。

样本数据:

12“用户信息12”

13“用户信息13”

14“用户信息14”

15“用户信息15”

我想使用基于 key 的 Flink 从 redis 获取数据。示例 14 应返回“UserInfo14”。输出应该打印在 Flink 日志文件或终端中,无论它是什么。

提前致谢。

0 投票
1 回答
1468 浏览

apache-flink - Flink SQL - 如何使用自定义模式解析 TIMESTAMP?

从文档看来,Flink 的 SQL 只能解析某种格式的时间戳,即:

TIMESTAMP 字符串:将格式为“yy-mm-dd hh:mm:ss.fff”的时间戳字符串解析为 SQL 时间戳。

有没有办法传入自定义 DateTimeFormatter 来解析不同类型的时间戳格式?

0 投票
1 回答
106 浏览

apache-flink - Flink 跨多个主机读取 CSV

我有一个像https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/cluster_setup.html这样的集群,在其中我有多个 CSV 对应于每个主机的分片。我想使用表 API 计算跨多个主机的 CSV 列的总和。每个工作人员都应该能够计算他拥有的 CSV 的总和并将结果返回到主服务器。是否有可能,如果这是我应该实施的。