我正在尝试在 SQL DDL(1.14.0) 中执行 python UDF 函数
Python文件在这里:</p>
from pyflink.table import DataTypes
from pyflink.table.udf import udf
@udf(input_types=[DataTypes.INT()], result_type=DataTypes.INT())
def add_one(a: int):
return a + 1
并启动 flink 集群:
➜ flink-1.14.0 ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host magiclian-ubuntu.
Starting taskexecutor daemon on host magiclian-ubuntu.
Java代码在这里:
public class PyUDF {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//set cfg
tEnv.getConfig().getConfiguration().setString("python.files",
"/home/magic/workspace/python/flinkTestUdf/udfTest.py");
tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");
tEnv.executeSql(
"CREATE TEMPORARY SYSTEM FUNCTION add1 AS 'udfTest.add_one' LANGUAGE PYTHON"
);
TableResult ret1 = tEnv.executeSql("select add1(3)");
ret1.print();
env.execute();
}
}
然后通过 Flink 客户端运行作业:
flink run /home/magic/workspace/flink-jobs/UDF/pythonUDF/target/pythonUDF-1.0.0.jar
错误是:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. Cannot instantiate user-defined function 'add1'.
但是当我使用 sql-client 执行我的 py UDF 时,它运行成功。
启动 sql 客户端:
PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3 ./sql-client.sh embedded -pyexec /usr/bin/python3 -pyfs home/magic/workspace/python/flinkTestUdf/udfTest.py
然后
create temporary system function add1 as 'udfTest.add_one' language python;
然后
select add1(3);
我得到了正确的结果4
,我的代码有问题吗?
我看到版本支持 py UDF 函数1.11
https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python+UDF+in+SQL+Function+DDL
,但现在我使用的是 1.14.0。
谁能帮帮我!