根据这个合流页面:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python+UDF+in+SQL+Function+DDL
python udf 在 Flink 1.11 中可用于 SQL 函数。
我在这里访问了 flink 文档:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html
并在终端上尝试此操作并使用以下参数启动sql-client.sh :
$ sql-client.sh embedded --pyExecutable /Users/jonathanfigueroa/opt/anaconda3/bin/python --pyFiles /Users/jonathanfigueroa/Desktop/pyflink/inference/test1.py
接着:
> Create Temporary System Function func1 as 'test1.func1' Language PYTHON;
[INFO] Function has been created.
当我尝试时:
> Select func1(str) From (VALUES ("Name1", "Name2", "Name3"));
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Instantiating python function 'test1.func1' failed.
我尝试过使用:-pyarch,--pyArchives, -pyexec,--pyExecutable, -pyfs,--pyFiles
在每一个组合中.zip, .py
,结果都是一样的。
顺便说一句,我的 python 文件如下所示:
def func1(s):
return s;
有什么我想念的吗?
亲切的问候,
乔纳森