问题标签 [flink-sql]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
282 浏览

apache-flink - flink:在 flink sql api 中使用 allowedLateness

我正在使用 flink sql api,我有一个类似的 sql

我可以启用“allowedLatenness”并将延迟数据作为辅助输出吗

0 投票
2 回答
577 浏览

apache-flink - Flink 动态缩放 1.5

这是关于 Flink 1.5 中的动态缩放

我正在使用 Yarn 来运行 Flink 作业。我从静态资源开始这些工作。是否有任何选项可以在特定条件下自行扩展这些工作,例如是否存在内存问题。

在 Flink 1.5 发行说明中-

Applications can be rescaled without manually triggering a savepoint. Under the hood, Flink will still take a savepoint, stop the application, and rescale it to the new parallelism.

这意味着我必须监控我的作业内存并且必须手动触发重新调整。这些是否有任何解决方法来处理这个问题。

0 投票
1 回答
549 浏览

scala - leftOuterJoin 抛出 TableException:不支持的连接类型“LEFT”

我正在尝试在两个表上运行左外连接并将结果转换为 DataStream。

我在使用 flink 之前所做的所有连接都是内部连接,并且我总是在连接之后使用.toRetractStream[MyCaseClass](someQueryConfig). 但是,由于左连接引入了空值,我从flink 文档的理解是我不能再使用案例类,因为它们在将表转换为 DataStream 时不支持空值。

所以,我正在尝试使用 POJO 来实现这一点。这是我的代码:

这编译得很好,但是当我尝试运行它时,我得到org.apache.flink.table.api.TableException: Unsupported join type 'LEFT'. Currently only non-window inner joins with at least one equality predicate are supported. 但是,根据这些文档,似乎我应该能够运行左连接。似乎还值得注意的是,错误是从.toAppendStream[EnrichedTaskUpdateJoin](qConfig). 我想也许non-window错误的一部分暗示我的空闲状态保留时间有问题,所以我取出查询配置,但得到了同样的错误。

希望这有足够的上下文,但如果我需要添加任何其他内容,请告诉我。另外,我正在运行 flink 1.5-SNAPSHOT 和 Circe 进行 json 解析。我对 scala 也很陌生,所以这很可能只是一些愚蠢的语法错误。

0 投票
2 回答
2405 浏览

java - 无法使用 Flink Table API 打印 CSV 文件

我正在尝试读取一个包含 34 个字段的文件,以使用 Netbeans 在控制台上打印。但是,我能打印的只是模式。因为在与 csvreader 一起使用的这个特定版本的 Flink 中缺少打印选项。

请查看代码并帮助我了解我应该在哪里更正。我会使用CSVReader内置的 API,但事实证明它不支持超过 22 个字段,因此求助于使用 Table API。还尝试使用CsvTableSource1.5.1 Flink 版本,但语法不走运。由于.field("%CPU", Types.FLOAT())类型浮动无法识别符号的不断给出错误。我的主要目标是能够读取 CSV 文件,然后发送到 Kafka 主题,但在此之前我想检查文件是否被读取,还没有运气。

这是输出

这是另一个版本的代码,它也不起作用

新编辑如果我必须将它传递给 Kafka 主题,然后传递给函数调用?这是我尝试过的:

stream.map 行抛出错误:

0 投票
1 回答
1666 浏览

apache-flink - Apache Flink 中 DataStream 和 Table API 的区别

我是 Apache Flink 的新手,想了解 DataStream 和 Table API 之间的用例。请帮助我了解何时选择 Table API 而不是 DataStream API。

据我了解,可以使用 Table API 完成的事情也可以使用 DataStream API 完成。两种 API 有何不同?

0 投票
2 回答
1897 浏览

apache-flink - Flink 中的事件时间窗口不触发

当我使用 flink 事件时间窗口时,窗口不会触发。我该如何解决这个问题,有什么方法可以调试吗?

0 投票
1 回答
598 浏览

apache-flink - Flink - Table SQL API - 向表中添加列

我想知道是否有一种方法可以将具有常量值的列添加到 Flink(Java API)中的表中,例如 Spark DF/DS 中的 .withColumn 函数?

问候, 巴斯蒂安

0 投票
1 回答
170 浏览

apache-flink - Flink-Dataset-Flink 可以尊重多个流/输入的处理顺序吗?

在我的 Flink 批处理程序(DataSet / Table )中,我正在读取多个文件,这正在产生不同的流,进行一些处理,并以输出格式保存它
由于 flink 使用数据流模型,而我的流并不真正相关,它是并行处理

但是我希望 Flink 至少尊重我的输出操作的顺序,因为我希望 flow1 在 flow2 之前保存

例如我有类似的东西:

我希望 flink 等待 dataSet1 被保存以继续......
我怎样才能将它作为连续操作?
我已经看过执行模式,但这不是这样做的

问候, 巴斯蒂安

0 投票
1 回答
445 浏览

apache-flink - Apache Flink:Table API 状态是否可扩展?

根据 Flink Table API Streaming Concepts,Table API 和 SQL 查询可能会由于状态大小的增长而失败。

状态大小:连续查询在无限流上进行评估,通常应该运行数周或数月。因此,连续查询处理的数据总量可能非常大。必须更新先前发出的结果的查询需要维护所有发出的行才能更新它们。例如,第一个示例查询需要存储每个用户的 URL 计数,以便能够增加计数并在输入表接收到新行时发送新结果。如果只跟踪注册用户,那么要维护的计数可能不会太高。但是,如果为非注册用户分配了唯一的用户名,则要维护的计数数量会随着时间的推移而增加,并最终可能导致查询失败。

Table API 和 SQL 在后台使用 DataStream API。

Table API / SQL 查询的状态不应该像 DataStream API 作业的状态一样扩展吗?

0 投票
0 回答
614 浏览

java - 无法从 flink SQL 查询中获取结果

我遇到了一个问题,我在 Flink-SQL 中的查询没有得到结果。

我有一些信息存储在两个 Kafka 主题中,我想将它们存储在两个表中,并以流的方式在它们之间执行连接。

这些是我的 flink 指令:

这是我的 SQL 查询:

然后,我打印结果:

但我没有得到任何答案,我的程序卡住了......

对于用于序列化和反序列化的模式,这是我的测试类,它存储了我的查询结果(两个字段分别是字符串和长整数,分别代表 block_timestamp 和计数):

BlockSchemaTransactionsSchema类的原理相同。

你知道为什么我不能得到我的查询结果吗?我应该测试BatchExecutionEnvironment吗?