问题标签 [flink-table-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
1052 浏览

scala - 简单的 TableAPI SQL 查询不适用于 Flink 1.10 和 Blink

我想使用 TableAPI 定义 Kafka 连接器并在这样描述的表上运行 SQL(由 Kafka 支持)。不幸的是,该Rowtime定义似乎没有按预期工作。

这是一个可重现的示例:

产生

我在 Flink 上1.10.0

0 投票
1 回答
605 浏览

python - Apache-Flink 1.11 无法在 SQL 函数 DDL 中使用 Python UDF

根据这个合流页面:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python+UDF+in+SQL+Function+DDL

python udf 在 Flink 1.11 中可用于 SQL 函数。

我在这里访问了 flink 文档:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html

并在终端上尝试此操作并使用以下参数启动sql-client.sh :

$ sql-client.sh embedded --pyExecutable /Users/jonathanfigueroa/opt/anaconda3/bin/python --pyFiles /Users/jonathanfigueroa/Desktop/pyflink/inference/test1.py

接着:

当我尝试时:

我尝试过使用:-pyarch,--pyArchives, -pyexec,--pyExecutable, -pyfs,--pyFiles在每一个组合中.zip, .py,结果都是一样的。

顺便说一句,我的 python 文件如下所示:

有什么我想念的吗?

亲切的问候,

乔纳森

0 投票
1 回答
83 浏览

apache-flink - Flink Table API 使用什么后端?它需要任何关系数据库吗?

我对 Flink 相当陌生,并试图了解可以使用 Stream API/Table API 的适当用例。作为它的一部分,试图理解

  1. 像 Stream API 一样,Table API 是否可以灵活地选择它可以使用的状态后端类型?
  2. Table API 有哪些可用的后端,它是否需要任何外部数据存储,例如 My SQL?或任何其他数据存储?

简而言之,试图了解 Table API 使用的后端工作。

0 投票
1 回答
304 浏览

python - Apache-Flink 1.11 无法在 Java Flink Streamming Job 中通过 SQL Function DDL 使用 Python UDF

Flip-106有一个如何通过 SQL 函数 DDL 在批处理作业 java 应用程序中调用用户定义的 python 函数的示例......

我一直在尝试在流式作业 java 应用程序中重现这个相同的示例,这是我的代码:

对于批处理作业中的这一特定行:

我还没有找到流媒体作业的等价物

1. 你能帮我把这个翻转 106 的例子从批处理映射到流吗?

我最终想要的是用 flink 1.11 在流式作业 java flink 应用程序中调用一个 python 函数,如下所示:

并使用该udf的结果进行进一步处理(不一定在控制台中打印)

我已经编辑了该test.py文件,以查看是否至少不管未命名的表在 python 中是否正在执行某些操作。

并且没有打印任何内容,未创建 test.txt 文件,并且该值未返回到流式作业。所以基本上这个python函数没有被调用。

2.我在这里缺少什么?

感谢 David、Wei 和 Xingbo 迄今为止的支持,因为建议的每一个细节都对我有用。

此致,

乔纳森

0 投票
1 回答
163 浏览

apache-flink - 集成 DataStreamAPI 和 TableAPI

除了这个问题之外,我还创建了这个例子来集成DataStreamAPITableAPI,这次我没有错误,我有两个工作而不是一个,一个是DataStreamAPI为运行完美的工作创建的,另一个工作是为这TableAPI也运行完美,但唯一的问题是永远不会从 中接收任何值DataStreamAPI,例如:

通过这样做,我可以在记录器中看到这一行:

但没有收到或发出记录。

查看DataStream工作 的图像在此处输入图像描述

并查看该TableAPI工作 的图像在此处输入图像描述

任何的想法?提前致谢。亲切的问候!

0 投票
1 回答
540 浏览

apache-flink - Flink SQL 单元测试:如何分配水印?

我正在为使用 match_recognize 的 Flink SQL 语句编写单元测试。我正在设置这样的测试数据

我有两个问题,

  • 如何将 event_time 指定为水印字段?(表示行时间)
  • 不太重要,给创建的表起一个有意义的名字?

FLINK 版本:1.11

0 投票
2 回答
209 浏览

postgresql - pyflink JDBC Postgresql Catalog 对数据类型 UUID 抛出错误,如何处理 Flink Table API 中的 uuid 数据类型?

Apache Flink 1.11.0 Python 表 API 目录:postgresql

通过 Table API 从包含 UUID 数据类型列的 postgresql 目录表中读取和写入数据,抛出 UUID 数据类型 unsupportedOperatorException。

如何处理 pyFlink 中的 UUID 数据类型?

0 投票
1 回答
37 浏览

apache-flink - List all Sources and Sinks in a Flink SQL job

I'm building a sort of wrapper around Flink SQL. I construct a job with a bunch of user-specified SQL statements with StreamTableEnvironment.sqlUpdate. Some are INSERTs, some are CREATEs. I also do some sqlQuerys.

Before I'm calling StreamExecutionEnvironment.execute, I'd like to list all Sources and Sinks that the user created.

Is there a simple / official way to do it, or do I need to go exploring with StreamExecutionEnvironment.getStreamGraph().getJobGraph().getVerticesSortedTopologicallyFromSources()?

Best regards

EDIT: I need the table names, and associated schemas.

0 投票
1 回答
1151 浏览

apache-kafka - Flink Table API:SQL 执行中的 GROUP BY 抛出 org.apache.flink.table.api.TableException

我有一个非常简化的用例:我想使用 Apache Flink (1.11) 从 Kafka 主题(我们称之为 source_topic)读取数据,计算其中的一个属性(称为 b)并将结果写入另一个 Kafka 主题(result_topic )。

到目前为止,我有以下代码:

但是当我执行它时,我收到以下错误:

SELECT我可以通过一个简单的语句将数据写入 Kafka 主题。但是一旦我添加了该GROUP BY子句,就会抛出上面的异常。我遵循了 Flink 关于使用 Table API with SQL for Python 的文档:https ://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#sql

非常感谢任何帮助,我对流处理和 Flink 非常陌生。谢谢!

0 投票
0 回答
132 浏览

java - 远程 Flink 作业执行,查询 Flink 集群上的 Hive


我使用 Flink 1.11.2、Hive 2.1.1、Java 8。尝试对 Hive 执行远程查询,将其打包在 jar 中并通过 Flink 的 RestClient 运行:

工作在哪里:

在 flink-conf.yaml 中只有一个附加参数:

当我运行它时,每隔一段时间就会出现这些错误:

或者:

你能解释一下吗?