来自魏忠的原始回答。我只是记者。谢谢伟!
此时(Flink 1.11),有两种方法在起作用:
- 当前:UDF 定义中的 DataTypeHint + 用于 UDF 注册的 SQL
- 过时:覆盖 UDF 定义中的 getResultType + t_env.register_java_function 以进行 UDF 注册
代码
斯卡拉 UDF
package com.dummy
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row
class dummyMap extends ScalarFunction {
// If the udf would be registered by the SQL statement, you need add this typehint
@DataTypeHint("ROW<s STRING,t STRING>")
def eval(): Row = {
Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
}
// If the udf would be registered by the method 'register_java_function', you need override this
// method.
override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
// The type of the return values should be TypeInformation
Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), Types.STRING()))
}
}
Python代码
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
s_env = StreamExecutionEnvironment.get_execution_environment()
st_env = StreamTableEnvironment.create(s_env)
# load the scala udf jar file, the path should be modified to yours
# or your can also load the jar file via other approaches
st_env.get_config().get_configuration().set_string("pipeline.jars", "file:///Users/zhongwei/the-dummy-udf.jar")
# register the udf via
st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' LANGUAGE SCALA")
# or register via the method
# st_env.register_java_function("dummyMap", "com.dummy.dummyMap")
# prepare source and sink
t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
st_env.execute_sql("""create table mySink (
output_of_my_scala_udf ROW<s STRING,t STRING>
) with (
'connector' = 'print'
)""")
# execute query
t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()