1

这两个查询有什么区别:

SELECT my_fun(col_name) FROM my_table;

CREATE TABLE new_table AS SELECT my_fun(col_name) FROM my_table;

其中 my_fun 是一个 java UDF。

我在问,因为当我创建新表(第二个查询)时,我收到一个 java 错误。

Failure while running task:java.lang.RuntimeException: java.lang.RuntimeException: Map operator initialization failed
...
Caused by: org.apache.hadoop.hive.ql.exec.UDFArgumentException: Unable to instantiate UDF implementation class com.company_name.examples.ExampleUDF: java.lang.NullPointerException

我发现错误的来源是我的java文件中的行:

encoded = Files.readAllBytes(Paths.get(configPath));

但问题是为什么它在未创建表时有效而在创建表时失败?

4

2 回答 2

1

问题可能与您读取文件的方式有关。尝试将文件路径作为UDF中的第二个参数传递,然后读取如下

private BufferedReader getReaderFor(String filePath) throws HiveException {
    try {
        Path fullFilePath = FileSystems.getDefault().getPath(filePath);
        Path fileName = fullFilePath.getFileName();
        if (Files.exists(fileName)) {
            return Files.newBufferedReader(fileName, Charset.defaultCharset());
        }
        else
        if (Files.exists(fullFilePath)) {
            return Files.newBufferedReader(fullFilePath, Charset.defaultCharset());
        }
        else {
            throw new HiveException("Could not find \"" + fileName + "\" or \"" + fullFilePath + "\" in inersect_file() UDF.");
        }
    }
    catch(IOException exception) {
        throw new HiveException(exception);
    }
}

private void loadFromFile(String filePath) throws HiveException {
    set = new HashSet<String>();

    try (BufferedReader reader = getReaderFor(filePath)) {
        String line;
        while((line = reader.readLine()) != null) {
            set.add(line);
        }
    } catch (IOException e) {
        throw new HiveException(e);
    }
}

可以在此处找到使用文件阅读器的不同通用 UDF 的完整代码

于 2020-08-04T13:37:59.160 回答
1

我认为有几点不清楚,所以这个答案是基于假设的。

首先,重要的是要了解 hive 当前优化了几个简单的查询,并且根据数据的大小,为您工作的查询SELECT my_fun(col_name) FROM my_table;很可能从您正在执行作业的客户端本地运行,这就是为什么您的 UDF 可以在本地访问您的配置文件,这种“执行模式”是因为您的数据大小。CTAS 触发独立于输入数据的作业,该作业分布在集群中运行,每个工作人员无法访问您的配置文件。

看起来您正在尝试从本地文件系统读取配置文件,而不是从 HDSFS Files.readAllBytes(Paths.get(configPath)),这意味着您的配置必须在所有工作节点中复制或预先添加到分布式缓存中(您可以使用 add file from this, doc here你可以在这里找到另一个关于从 UDF 访问分布式缓存中的文件的问题。

另一个问题是您通过环境变量传递配置文件的位置,该环境变量不会作为配置单元作业的一部分传播到工作节点。您应该将此配置作为 hive 配置传递,假设您正在扩展 GenericUDF ,这里有一个从 UDF 访问 Hive Config 的答案。

于 2020-08-04T14:58:29.407 回答