问题标签 [flink-sql]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
323 浏览

java - Flink Autojoin 与行时间列

我有一个具有以下结构的 Flink 表:

行时间基于myTimestamp.

我有以下处理效果很好:

我想修改之前的代码,比如对于每个窗口,我只使用每个Id2. 因此,我认为按以下方式更改代码会起作用:

但是当我这样做时,我收到以下错误:

看起来 Flink 不“理解”我加入的两个表是同一个表。

我怎样才能做我想做的事?

0 投票
0 回答
210 浏览

apache-flink - Flink SQL Match_Recognize 给出不完整的结果

我将以下数据作为流提供给 Flink

我需要运行 SQL 匹配识别如下

我希望输出包括像这样的间隔

但我实际上得到的是前两个只记录

下面是我将流转换为表并将查询结果返回到流的完整代码

会不会有什么我做错了或者我忘了做的事情?

我正在使用 Flink 1.8.1。

0 投票
1 回答
553 浏览

apache-flink - 有没有办法确定运行 Flink 作业所需的总作业并行度或插槽数(在运行之前)

有没有办法确定从执行计划或以其他方式运行作业所需的任务槽总数,而不必先实际启动作业。

根据这个文档:https ://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html

“一个 Flink 集群需要与作业中使用的最高并行度一样多的任务槽。无需计算程序总共包含多少任务(具有不同的并行度)。”

如果我从 StreamExecutionEnvironment 获得执行计划(在设置之后但没有实际执行作业)并从执行计划 json 中的节点列表中获得任何节点的最大并行度,这是否足以确定所需的任务槽数运行作业。

是否有任何情况不再是这种情况?或者有什么注意事项要记住?

0 投票
1 回答
407 浏览

streaming - 我们如何在 Flink SQL 查询中使用 SQL 客户端进行窗口连接?

我们如何在 Flink SQL 查询中使用 SQL 客户端进行窗口连接。以与以下链接中提到的相同方式开窗 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/joining.html

需要窗口化的示例查询 SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON sourceKafka。source.ip=badips.ip

sourceKafka 是源表,连续不断的 kafka badips 流是另一个源表

0 投票
1 回答
194 浏览

apache-flink - 在 Flink SQL 中使用 SQL 客户端时,我们如何使用查询配置?

在 Flink SQL 中使用 SQL 客户端时,我们如何使用查询配置?

与以下链接中提到的方式相同,适用于 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/query_configuration.html

想用 Idle 状态保留时间。

0 投票
1 回答
373 浏览

apache-flink - Flink:将收回的 SQL 转换为附加的 SQL,仅使用 SQL 来提供临时表

我正在向用户提供 Flink SQL 接口,所以我不能真正使用 Table 或 Java/Scala 接口。一切都需要在 SQL 中指定。不过,我可以解析 SQL 文件中的注释,并添加指定的临时较低级别的 API 指令。

一个用户如何转换,比如:

- 将值撤回到将附加它们的表单。此附加表单可能具有以下架构:

Aka 具有等同于 AppendStreamTableSink 的RetractStreamTableSink,但它不是一个接收器。

所有这一切都是为了能够使用 average_source_data_appending 创建一个Temporal 表(过滤撤回消息),但是这种表只接受 append-only 源表。

我考虑过使用 Windows(如在此处讨论),但我希望对临时表的更新是即时的。

0 投票
1 回答
48 浏览

apache-flink - Flink SQL Job 堆空间不足

我正在运行一个查询来加入一个流和一个表,如下所示。它的堆空间不足。即使它在 flink 集群中有足够的堆空间(60GB * 3)

此查询是否需要驱逐策略?

0 投票
0 回答
100 浏览

scala - KafkaTableSink:如何使用它?

这是表格:

这是我想要的架构:

我想做Kafka010TableSink:

架构和序列化架构中发生了什么,我在使用 FlinkFixedPartitioner 时遇到错误。

0 投票
1 回答
1207 浏览

scala - 在 Scala 中使用 Flink 的 leftOuterJoinLateral 时出现 NullPointerException 异常

我正在尝试遵循文档并创建一个表函数来“展平”一些数据。使用 进行展平时,表函数似乎工作正常joinLateral。但是在使用时leftOuterJoinLateral,我收到以下错误。我正在使用 Scala 并尝试了 Table API 和 SQL,结果相同:

原因:java.lang.NullPointerException:空结果不能存储在案例类中。

这是我的工作:

当我更改为时.leftOuterJoinLateral.joinLateral我得到了预期的结果:

使用时,.leftOuterJoinLateral我会期望类似:

似乎这可能是 Scala API 的错误?我想在提出罚单之前先检查这里,以防我做一些愚蠢的事情!

0 投票
1 回答
81 浏览

apache-flink - 如何使用 flink cep 完成聚合任务

我需要计算一天中发生 A 的次数以及在 15 分钟内发生 B 的次数。流可能是 A1 ,A2,B1,B2,A3,B3,B4,B5,A4,A5,A6,A7,B6 。在我的情况下,事件结果是A2,B1 A3,B3 A7,B6。当匹配器发生时我需要接收实时结果。我已经厌倦了一些东西。我认为这只能通过使用flink cep才能实现。但是flink- sql-cep 不支持聚合。它只计算事件发生。在这种情况下,如何用一条 SQL 完成这个任务。

我累了两步。我先用flink sql cep to matcher,然后sink to kafka。在一步中,我使用 pre kafka 并使用窗口聚合。

第一步: select pins as pin,'first-step' as result_id, cast(order_amount as varchar) as result_value,event_time as result_time from stra_dtpipeline MATCH_RECOGNIZE ( PARTITION BY pin
ORDER BY event_time MEASURES
t1.pin as pins, '1' as order_amount , LOCALTIMESTAMP as event_time ONE ROW PER MATCH AFTER MATCH SKIP to next row PATTERN (t1 t2) WITHIN INTERVAL '30' SECOND
DEFINE
t1 as t1.act_type='100001' , t2 as t2.act_type='100002' ) 第二步:select pin,'job5' as result_id,cast(sum(1) over (PARTITION BY pin,cast(DATE_FORMAT(event_time, '%Y%m%d') as VARCHAR) order by event_time ROWS BETWEEN INTERVAL '1' DAY PRECEDING AND CURRENT ROW ) as VARCHAR) as result_value, CURRENT_TIMESTAMP as result_time from stra_dtpipeline_mid where result_id='first-step' and DAYOFMONTH(CURRENT_DATE )=DAYOFMONTH(event_time)

我希望用一个 SQL 完成这项任务。