0

我是 pyflink 的新手,并尝试使用 python API 注册自定义 UDF 函数。目前,我在服务器环境和本地 IDE 环境中都遇到了问题。

当我尝试执行下面的示例时,我收到一条错误消息:配置的任务堆外内存 0 字节小于最少需要的 Python 工作者内存 79 mb。任务堆外内存可以使用配置键'taskmanager.memory.task.off-heap.size

当然,我已经在我的flink-conf.yaml中添加了 required 属性,并检查了pyflink-shell.sh 是否使用指定的配置初始化了 env ,但这没有任何意义,我仍然有一个错误。

这是我正在尝试运行的代码示例:

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, DataTypes
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def test_udf(i):
    return i


if __name__ == "__main__":
    env = ExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    bt_env = BatchTableEnvironment.create(env)
    bt_env.register_function("test_udf", test_udf)

    my_table = bt_env.from_elements(
        [
            ("user-1", "http://url/1"),
            ("user-2", "http://url/2"),
            ("user-1", "http://url/3"),
            ("user-3", "http://url/4"),
            ("user-1", "http://url/3")
        ],
        [
            "uid", "url"
        ]
    )

    my_table_grouped_by_uid = my_table.group_by("uid").select("uid, collect(url) as urls")
    bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)

    bt_env.execute_sql("select test_udf(uid) as uid, urls from my_temp_table").print()

谢谢你的帮助!

4

1 回答 1

1

答案很简单,非常感谢邮件列表中的人。

  • 此属性应通过 table env API 手动指定;
  • 运行 pyflink-shell.sh 时,flink-conf.yaml 不适用于 flink-env;

任何属性都可以按以下方式配置:

bt_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
于 2020-10-14T08:54:42.520 回答