1

我正在尝试在 SQL DDL(1.14.0) 中执行 python UDF 函数

Python文件在这里:</p>

from pyflink.table import DataTypes
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.INT()], result_type=DataTypes.INT())
def add_one(a: int):
    return a + 1

并启动 flink 集群:

➜  flink-1.14.0 ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host magiclian-ubuntu.
Starting taskexecutor daemon on host magiclian-ubuntu.

Java代码在这里:

public class PyUDF {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //set cfg
        tEnv.getConfig().getConfiguration().setString("python.files",
                "/home/magic/workspace/python/flinkTestUdf/udfTest.py");
        tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");
        tEnv.executeSql(
                "CREATE TEMPORARY SYSTEM FUNCTION add1 AS 'udfTest.add_one' LANGUAGE PYTHON"
        );

        TableResult ret1 = tEnv.executeSql("select add1(3)");
        ret1.print();

        env.execute();
    }
}

然后通过 Flink 客户端运行作业:

flink run /home/magic/workspace/flink-jobs/UDF/pythonUDF/target/pythonUDF-1.0.0.jar

错误是:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. Cannot instantiate user-defined function 'add1'.

但是当我使用 sql-client 执行我的 py UDF 时,它运行成功。

启动 sql 客户端:

PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3  ./sql-client.sh embedded -pyexec /usr/bin/python3 -pyfs home/magic/workspace/python/flinkTestUdf/udfTest.py

然后

create temporary system function add1 as 'udfTest.add_one' language python;

然后

select add1(3);

我得到了正确的结果4,我的代码有问题吗?

我看到版本支持 py UDF 函数1.11 https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python+UDF+in+SQL+Function+DDL,但现在我使用的是 1.14.0。

谁能帮帮我!

4

1 回答 1

0

确保所有依赖项都已安装。

爪哇:

  • 8 或 11

  • 行家 3.5+

  • flink 罐子:

      <dependencies>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-java</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-streaming-java_2.11</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-clients_2.11</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-table-api-java-bridge_2.11</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-table-common</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-table-planner_2.11</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-python_2.11</artifactId>
                 <version>${flink.version}</version>
             </dependency>
         </dependencies>
    

Python :

  • Python 3.6+
  • Apache Beam(== 2.19.0)</li>
  • 点(>= 7.1.0)</li>
  • setupTools(>= 37.0.0)</li>
  • apache-fink (1.14.0)
于 2021-12-07T08:51:22.757 回答