问题标签 [flink-sql]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1270 浏览

java - Flink SQL:在 GROUP BY 查询的结果中重复分组键

我想在一个包含 group by 语句的表中使用 Flink SQL 进行简单查询。但在结果中,group by 语句中指定的列有重复的行。那是因为我使用流媒体环境并且它不记得状态吗?

我对 block_hash 列使用 group by 语句,但我有好几次相同的 block_hash。这是 print() 的结果:

Test{field2='0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1', count=1} Test{field2='0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1', count=1} Test{field2='0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1', count=2} Test{field2='0x780aadc08c294da46e174fa287172038bba7afacf2dff41fdf0f6def03906e60', count=1} Test{ field2 ='0x182D31BD491527E93C4E44686057207EEE9EE90C6A84283308A2BD7BD2E10E53',count = 1} test = 1} test2 ='0x182D31BD491BD4911527E11527E1152ARINGER

如何在不使用 BatchEnvironment 的情况下解决此问题?

0 投票
1 回答
491 浏览

apache-flink - Flink 将任务分配给一个 taskmanager,直到 slot 满

我有一个有 5 个节点的 flink 集群。每个节点有 8 个插槽。我正在使用 Flink 1.5.2。

如果有 N 个任务,问题是:

如果 N <= 8,所有任务将分配给 node1。新任务将分配给 node2 直到它已满。等等。其他节点将为空。

我希望将所有任务分配到平衡的所有节点。有人可以帮忙吗?

谢谢。

0 投票
2 回答
1795 浏览

apache-flink - 相当于 Flink-SQL 中的“from_unixtime(bigint unixtime)”

我正在寻找与Flink-SQL 中的 Spark-SQL 中存在的函数from_unixtime(bigint unixtime)的等效项。

我的目标是转换这种格式:1439799094

变成这种格式:2015-05-18 05:43:37

0 投票
1 回答
708 浏览

hadoop-yarn - Flink on yarn 是否使用 yarn-session?

在 yarn 上部署 flink 应用有两种方法。第一个是使用 yarn-session,所有的 flink 应用都部署在 session 中。第二种方法是将每个 flink 应用程序部署在 yarn 上作为一个yarn 应用程序。

我的问题是这两种方法有什么区别?产品环境选择哪一种?

我找不到任何关于这个的材料。

我认为第一种方法会节省资源,因为只需要一个 jobmanager(yarn application master)。虽然这也是一个缺点,因为唯一的 jobmanager 可能是瓶颈,而 flink 应用程序越来越多。

0 投票
3 回答
7174 浏览

flink-streaming - Flink:在类路径中找不到适合“org.apache.flink.table.factories.DeserializationSchemaFactory”的表工厂

我用的是flink的table api,我从kafka接收数据,然后注册成表,然后用sql语句处理,最后将结果转回流,写入目录,代码如下:

本地测试成功,但是在集群中提交如下命令时,出现如下错误:

==================================================== =====

====================================

有人能告诉我这是什么原因造成的吗?我对此感到非常困惑…………

0 投票
1 回答
468 浏览

apache-flink - Flink SQL:连接表的内存不足

我有一个经常更新的 MySql 表。我想为过去 20 秒内更新的每个 id 拍摄快照并将值写入 redis。我使用 binlog 作为流输入,并将数据流转换为 Flink 表。我运行以下 sql。

我知道表连接会使状态大小过大,我将 StreamQueryConfig 设置如下

我运行了一天的任务并得到了内存不足的错误。我怎么解决这个问题?

0 投票
1 回答
750 浏览

scala - Flink Table/SQL API:会话窗口聚合后修改 rowtime 属性

我想使用Session窗口聚合,然后Tumble在生成的结果之上运行窗口聚合Table API/Flink SQL

是否可以在第一次聚合后修改rowtime属性session以使其等于.rowtime会话中最后观察到的事件?

我正在尝试做这样的事情:

关键部分是:

所以我想重新分配.rowtime会话中最新事件的记录,没有会话间隙(2.minutes在本例中)。

这在 BatchTable 中工作正常,但在 StreamTable 中不起作用:

是的,我知道,感觉就像我不想发明时间机器并改变时间的顺序。但实际上有可能以某种方式实现所描述的行为吗?

0 投票
1 回答
179 浏览

apache-flink - RichParallelSourceFunction 中的水印

我正在实现一个 SourceFunction,它从数据库中读取数据。如果停止或崩溃(即保存点和检查点),并且数据只处理一次,该作业应该能够恢复。

到目前为止我所拥有的:

如何确保只获取尚未处理的数据库行?我假设该ctx变量将包含有关当前水印的一些信息,以便我可以将查询更改为:

但它对我没有任何相关的方法。任何想法如何解决这个问题将不胜感激

0 投票
1 回答
289 浏览

apache-flink - Apache Flink - 启用连接排序

我注意到 Apache Flink 没有优化表的连接顺序。目前,它保持用户指定的连接顺序(基本上,它按字面意思接受查询)。我想 Apache Calcite 可以优化连接的顺序,但由于某种原因,这些规则在 Apache Flink 中没有使用。

例如,如果我们有两个表“ R ”和“ S

我们假设' S '是空的,我们想以两种方式连接这些表:

如果我们要计算tableOnetableTwo中的行数,两种情况下的结果都为零。问题是评估tableOne将比评估tableTwo花费更长的时间。

有没有什么方法可以自动优化join的执行顺序,甚至可以通过添加一些统计信息来启用可能的plan cost操作?如何添加这些统计信息?

在此链接的文档中,可能需要更改 Table 环境 CalciteConfig 但我不清楚如何去做。

请帮忙。

0 投票
1 回答
217 浏览

apache-flink - 在 Yarn 中运行 Flink

我在 Yarn 上运行 Flink(1.4.2)。我正在使用 Flink Yarn Client 将作业提交到 Yarn Cluster。

假设我有一个带有 4 个插槽的 TM,并且我部署了一个并行度 = 4 的 flink 作业,带有 2 个容器 - 1 个 JM 和 1 个 TM。每个并行实例将部署在 TM 中的每个任务槽中(每个槽运行的整个作业管道)。

我的工作做了一个连接(非键控流上的 SQL 时间窗口连接),它们缓冲了最后 3 小时的数据。根据 Flink 文档the separate threads running in different task slot share data sets and data structures, thus reducing the per-task overhead.

我的问题是这些在不同任务槽中运行的线程是否会共享这些缓冲的数据以供加入。这些线程之间共享的所有数据。

编辑

示例查询 -

SELECT R.order_id, S.order.restaurant_id FROM awz_s3_stream1 R INNER JOIN awz_s3_stream2 S ON CAST(R.order_id AS VARCHAR) = S.order_id AND R.proctime BETWEEN S.proctime - INTERVAL '2' HOUR AND S.proctime + INTERVAL '2' HOUR GROUP BY HOP(S.proctime, INTERVAL '2' MINUTE, INTERVAL '1' HOUR), S.命令.restaurant_id