问题标签 [flink-sql]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
1443 浏览

redis - 如何高效地将数据从 flink 管道写入 redis

我正在 Apache flink sql api 中构建管道。管道执行简单的投影查询。但是,我需要在查询之前和查询之后编写一次元组(确切地说是每个元组中的一些元素)。事实证明,我用来写入 redis 的代码会严重降低性能。即 flink 以非常小的数据速率产生背压。我的代码有什么问题以及如何改进。请有任何建议。

当我停止写redis之前和之后的表现非常好。这是我的管道代码:

附加部分:我尝试了第二个实现,使用 Jedis 批量编写 toredis 的过程功能。但是我收到以下错误。org.apache.flink.runtime.client.JobExecutionException: redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketException: 套接字未连接。我试图使批处理消息的数量更小,但一段时间后我仍然收到错误。

下面是流程函数的实现:

/** * 使用进程函数写入redis */

0 投票
1 回答
1015 浏览

apache-flink - 用于状态检查点的 Flink sql

当我使用 flink sql api 处理数据时。

重启app,sum结果没有保存在checkpoint中,还是从1开始。

登录为:

1 2 3 ... ... 100

重新启动应用程序它仍然启动:1 2 3 ...

我认为总和值将存储在 checkpint 文件中,并且重新启动应用程序可以从 checkpint 读取最后一个结果,例如:

101 102 103 ... 120

0 投票
2 回答
515 浏览

apache-kafka - Flink 的嵌套输出

我正在使用 Flink SQL 处理 Kafka 流,其中每条消息都从 Kafka 中提取,使用 flink sql 处理并推回 kafka。我想要一个嵌套输出,其中输入是平坦的,输出是嵌套的。例如说我的输入是

并希望输出为

我尝试在这里搜索和一些类似的链接,但找不到。是否可以使用 Apache Flink SQL API 来做到这一点?如有必要,可以使用用户定义的函数,但希望避免这样做。

0 投票
1 回答
270 浏览

java - 为什么 Flink SQL 对所有表都使用 100 行的基数估计?

我不确定为什么在这个例子中没有正确评估逻辑计划。

我更深入地查看了 Flink 基本代码,并检查了当 calcite 评估/估计对象中查询的行数时。出于某种原因,对于任何表源,它总是返回100

实际上在 Flink 中,在创建程序计划的过程中,对于每一个转换后的规则,它都被TableEnvironment .runVolcanoPlanner 称为VolcanoPlanner类 。计划者尝试通过调用RelMetadataQuery .getRowCount来优化和计算一些估计

我通过创建一个失败的测试来重现该错误,该测试应该将 0 断言为关系表“S”的行数,但它始终返回 100。

为什么会这样?有人对这个问题有答案吗?

0 投票
1 回答
1218 浏览

apache-flink - 将 ROW() 用于嵌套数据结构

我已经成功地使用来自 flink-json 工件的 JsonRowSerializationSchema 来TableSink<Row>使用 ROW 从 SQL 创建和输出 json。它非常适合发出平面数据:

现在,我正在尝试一个嵌套模式,它以一种奇怪的方式分解:



这是一个解析问题,但我对它为什么会发生感到困惑。col1 和 'ttt' 是 String 类型的表达式,应该是可替换的;但不知何故,解析器受到以下行的干扰,正如堆栈跟踪所说:

我错过了一些关于语法的东西吗?解析器试图做什么?我应该以另一种方式使用 ROW() 吗?

这是一个错误吗?

0 投票
1 回答
482 浏览

apache-flink - 如何在 Apache Flink 中创建外部目录表

我尝试创建和 ExternalCatalog 以在 Apache Flink 表中使用。我创建并添加到 Flink 表环境(这里是官方文档)。由于某种原因,“目录”中存在的唯一外部表在扫描期间未找到。我在上面的代码中遗漏了什么?

上面的例子是git中这个测试文件的一部分。

0 投票
1 回答
837 浏览

apache-flink - Flink 如何使用从 Avro 输入数据推断出的模式创建表

我在 Flink 数据集中加载了一个 Avro 文件:

以下是打印 DS 的结果:

现在我想从 DS Dataset 创建一个与 Avro 文件模式完全相同的表,我的意思是列应该是 N_NATIONKEY、N_NAME、N_REGIONKEY 和 N_COMMENT。

我知道使用这条线:

我可以创建一个表并设置列,但我希望从数据中自动推断列。可能吗?另外,我试过

但它使用架构创建了一个表:

0 投票
1 回答
301 浏览

apache-flink - 基于 Date 类型的字段过滤 Flink Table

我创建了一个表,其中有一个 Date 类型的字段,即f_date。我想要的查询的一部分基于f_date字段过滤表行。所以我做了以下事情:

然后我在所有两种情况下都得到了以下相同的错误:

我什至删除了单个 quatation 并尝试了:

我得到了错误:

最后我创建了一个 Date 对象并尝试:

我得到了错误:

在上述情况下,Flink 将日期识别为 String 或 Integer。我怎样才能以正确的格式给出日期!?

0 投票
1 回答
168 浏览

apache-flink - 使用 Flink Sql 选择 Top N Per Group

我正在使用 Flink SQL 来处理批处理案例。如何使用 FLINK SQL 获得每组的前 n 条记录?

0 投票
1 回答
65 浏览

java - 将 TemporalTableFunction 注册为函数时出现编译器错误

我正在关注 Flink 的Defining Temporal Table Function示例,编译器拒绝采用该代码:

我的编译器告诉我“TableEnvironment 类型中的方法 registerFunction(String, ScalarFunction) 不适用于参数 (String, TemporalTableFunction)”

我已经在这里这里搜索了源代码,确实没有 registerFunction 具有 TemporalTableFunction 签名。只有 ScalarFunction 类型。我对它为什么会编译感到困惑;但他们确实对此进行了测试

我只是无法导航 TemporalJoinITCase 从何处获取其 registerFunction 。

带有 scala 2.11 的 Flink 1.7.1,以及以下工件:flink-core、flink-java、flink-clients_、flink-streaming-java_、flink-table_、flink-streaming-scala_、flink-json、flink-runtime-web_ .

我不太清楚 Scala 是如何工作的。会不会是我从另一个神器中遗漏的一些特征?

最良好的问候!