问题标签 [flink-sql]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
405 浏览

apache-flink - 有没有办法定义一个由最近没有被事件触及的条目组成的动态表?

我是 Flink 的新手,我正在尝试使用它来获取我的应用程序的大量实时视图。我想构建的动态视图中至少有一个是显示未满足 SLA 或实质上已过期的条目,其条件是简单的时间戳比较。因此,如果最近没有被事件触及,我基本上希望在我的动态表中显示一个条目。在开发环境中使用 Flink 1.6(受 AWS Kinesis 限制)时,我没有看到 Flink 正在重新评估条件,除非事件触及该条目。

我已将我的开发环境插入到从 Web 服务器发送实时访问日志事件的 Kinesis 流中。这不是我真正的用例,但它很容易开始测试。我编写了一个简单的表查询,它获取请求路径、它的最后访问时间,并计算一个布尔标志来指示它是否在最后一分钟内未被访问。我正在通过连接到 PrintSinkFunction 的收回流对此进行调试,因此所有更新/删除都将打印到我的控制台。

我希望当我访问一个页面时,一个 Add 事件会发送到这个流。然后,如果我等待 1 分钟(什么都不做),我表中的 CASE 语句将评估为 1,所以我应该看到设置了该标志的 Delete 然后 Add 事件。

我实际看到的是,在我再次加载该页面之前什么都没有发生。Delete 事件实际上设置了标志,而紧随其后的 Add 事件再次清除了它(因为它不再“过期)。

编辑:我在搜索中遇到的最有用的提示是创建一个ProcessFunction。我认为这是我可以使用我的动态表进行的工作(在某些情况下,我最终会使用中间流来查看计算日期),但希望不必如此。

我已经使用 ProcessFunction 方法,但它需要比我最初认为的更多的修补:

  1. 我必须在我的 POJO 中添加一个字段,该字段会在 onTimer() 方法中发生变化(可能是您每次只需更改的日期或版本)
  2. 我必须将此字段注册为动态表的一部分
  3. 我必须在查询中使用此字段,以便重新评估查询并更改布尔标志(即使我实际上并未使用新字段)。我只是将它添加为我的 SELECT 子句的一部分。
0 投票
1 回答
837 浏览

json - 如何将带有 json 数组的 Datastream 分解为单个数组元素的 DataStream

我有一个 Datastream[ObjectNode],我从 kafka 主题中读取为反序列化的 json。这个 ObjectNode 的一个元素实际上是一个事件数组。这个数组有不同的长度。传入的 json 流如下所示:

我希望能够分解数组,以便内部的promotions每个元素都成为可以写入接收器 kafka 主题的单独消息。Flink 是否在 DataStream 和/或 Table API 中提供了爆炸功能?

我试图在这个流上做一个 RichFlatMap 以便能够收集单独的行,但这也只是返回一个 DataStream[Seq[GenericRecord]],如下所示:

请帮忙。

0 投票
1 回答
442 浏览

apache-flink - Flink 中的动态 SQL 查询

我有一个这样的 SQL 查询

根据用户输入,我想将 botcode='r1' 更改为给定输入。说 botcode='r10' 而不重新启动作业。有没有办法做到这一点。我在使用流 env 的 flink 1.7 上。我尝试配置流来读取输入。但停留在如何即时更改查询。谁能帮我这个?提前致谢

0 投票
1 回答
1603 浏览

apache-flink - Flink SQL:将 Bigint 转换为 Timesamp

这个问题很简单,但我在互联网上找不到任何答案。

我有一个如下所示的 Flink SQL 查询:

问题是我的字段 timestampMs 被视为 aBIGINT而不是 a TIMESTAMP,并且我在函数上有错误HOP。如何将其转换为TIMESTAMPtype ?

0 投票
1 回答
900 浏览

scala - 如何查询flink的可查询状态

我正在使用 flink 1.8.0 并且正在尝试查询我的工作状态。

我使用端口 9067这是默认端口,我的客户:

但我得到:

  1. 我究竟做错了什么 ?
  2. 我应该如何以及在哪里定义QueryableStateOptions
0 投票
0 回答
346 浏览

java - 如何使用 Apache Calcite 创建一个非常简单的规则并在 Apache Flink 上使用它?

我在 Flink 中有这个应用程序,它使用 Table API 从源打印数据。Flink 的官方文档说 Table API 在其核心使用 Calcite 来翻译和优化查询计划。他们没有很深入地描述它,所以我去了源代码并尝试从那里复制一些代码。但是,据我所知,他们也使用方解石规则。

如果我想实施自己的规则怎么办?可能吗?例如,如何在 Calcite 中实现一个简单的规则来更改过滤器的参数?

这是我的代码

0 投票
1 回答
483 浏览

scala - Flink Scala NotInferedR in scala Type mismatch MapFunction[Tuple2[Boolean,Row],InferedR]

我正在尝试在 MapFunction 的 flink 中将 Tuple2[Boolean,Row] 转换为 Row ,但失败并出现错误。

在此处输入图像描述

当我尝试运行时,我得到另一个错误。

在此处输入图像描述

代码我想要做什么

谢谢斯里

0 投票
0 回答
263 浏览

apache-flink - 使用 Flink SQL API 处理事件

我的用例 - 收集特定持续时间的事件,然后根据键对它们进行分组

Objective 处理后,用户可以根据key保存特定时长的数据

我打算怎么做 1)从 Kafka 接收事件

2)创建事件数据流

3)通过运行 SQL 查询将表与其关联并收集特定持续时间的数据

4)将新表与第二步输出相关联,并根据键对收集的数据进行分组

5)将数据保存在数据库中

我试过的解决方案-

我能够-

1)从卡夫卡接收事件,

2)设置数据流(比如说sensorDataStream)-

3)将表(比如说table1)与数据流相关联,并在运行SQL查询之后 -

这里 t1_Timestamp 和 t2_Timestamp 是预定义的纪元时间,会根据一些预定义的条件而改变

4)我可以通过在控制台上使用以下查询来打印此 sql 查询结果-

5)使用table1和以下类型的sql查询创建了一个新表(比如说table2)-

6)通过使用收集和打印数据 -

问题

1)我无法在控制台上看到第 6 步的输出。

我做了一些实验,发现如果我跳过 table1 设置步骤(这意味着一段时间内没有传感器数据俱乐部)并将我的 senserDataStream 与 table2 直接关联,那么我可以看到第 6 步的输出,但因为这是 RetractStream 所以我可以看到形式的数据,如果新事件即将到来,则此撤回流将使数据无效并打印新计算的数据。

我想要的建议

1)如何合并步骤 5 和步骤 6(表示表 1 和表 2)。我已经合并了这些表,但是由于控制台上看不到数据,所以我有疑问?难道我做错了什么?还是数据已合并但不可见?

2)我的计划是——

2.a) 在 2 遍中过滤数据,在第一遍中过滤特定时间间隔的数据,在第二遍中将数据分组

2.b)在数据库中保存 2.a 输出 这种方法是否有效(我有疑问,因为我正在使用数据流并且 table1 输出是附加流但 table2 输出是撤回流)?

0 投票
1 回答
2366 浏览

java - Flink:Rowtime 属性不能在常规连接的输入行中

使用 flink SQL API,我想将多个表连接在一起并在时间窗口内进行一些计算。我有 3 张来自 CSV 文件的表格,还有一张来自 Kafka。在 Kafka 表中,我有一个字段timestampMs,我想将其用于我的时间窗口操作。

为此,我做了以下代码:

但是当我运行它时,出现以下错误:

但我不明白解决方法提示部分。加入表格后,如何创建时间属性并进行一些窗口计算。

- - 编辑 - -

在上面的代码中,我替换了以下几行:

经过 :

但我收到 TemporalTableFunction 错误:

其中两个字段在“集合类型”和“表达式类型”之间不匹配。 TIMESTAMP(3) rowtime0TIMESTAMP(0) NOT NULL rowtime0

问题是我没有名为rowtime0. 看起来它是一个内部字段。我真的不明白这里发生了什么

0 投票
0 回答
464 浏览

java - 如何为每个 flink 作业单独的日志信息?

我有一些在纱线会话模式下在纱线集群上运行的 flink-streaming 作业,实际上我想分离每个作业日志以进行调试或其他目的。目前,我使用 Aspectj 和 Log4j MDC 来处理一些日志,例如下面的日志信息,它可以工作

但其他人似乎很难分开

我尝试了其他一些方法,比如更改 flink 的源代码,添加一些代码,如 MDC.put("",""),它在我运行时在 IDE 上工作,但在 yarn-cluster 上失败

那么,是否有其他方法可以解决此问题?