8

我有一个 Hive 表,其中包含客户呼叫的数据。为简单起见,考虑它有 2 列,第一列保存客户 ID,第二列保存调用的时间戳(unix 时间戳)。

我可以查询此表以查找每个客户的所有电话:

SELECT * FROM mytable SORT BY customer_id, call_time;

结果是:

Customer1    timestamp11
Customer1    timestamp12
Customer1    timestamp13
Customer2    timestamp21
Customer3    timestamp31
Customer3    timestamp32
...

是否可以创建一个 Hive 查询,从第二次调用开始,为每个客户返回两次连续调用之间的时间间隔?对于上面的示例,查询应该返回:

Customer1    timestamp12-timestamp11
Customer1    timestamp13-timestamp12
Customer3    timestamp32-timestamp31
...

我试图从sql 解决方案中调整解决方案,但我被 Hive 限制所困扰:它只接受 FROM 中的子查询,并且连接必须只包含 equalities

谢谢你。

编辑1:

我尝试使用 Hive UDF 函数:

public class DeltaComputerUDF extends UDF {
private String previousCustomerId;
private long previousCallTime;

public String evaluate(String customerId, LongWritable callTime) {
    long callTimeValue = callTime.get();
    String timeDifference = null;

    if (customerId.equals(previousCustomerId)) {
        timeDifference = new Long(callTimeValue - previousCallTime).toString();
    }

    previousCustomerId = customerId;
    previousCallTime = callTimeValue;

    return timeDifference;
}}

并将其与名称“delta”一起使用。

但似乎(从日志和结果)它是在 MAP 时间使用的。由此产生2个问题:

第一:使用此功能前,表数据必须按客户ID和时间戳排序。查询:

 SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time;

不起作用,因为排序部分是在 REDUCE 时间执行的,在我的功能被使用很久之后。

我可以在使用该函数之前对表数据进行排序,但我对此并不满意,因为这是我希望避免的开销。

第二:在分布式 Hadoop 配置的情况下,数据在可用的作业跟踪器之间进行拆分。所以我相信这个函数会有多个实例,每个映射器一个,因此可以在 2 个映射器之间拆分相同的客户数据。在这种情况下,我会失去客户的电话,这是不可接受的。

我不知道如何解决这个问题。我知道 DISTRIBUTE BY 确保所有具有特定值的数据都发送到同一个减速器(从而确保 SORT 按预期工作),有人知道映射器是否有类似的东西吗?

接下来我计划按照 libjack 的建议使用 reduce 脚本。在其他一些 hive 查询之间需要这种“计算”,所以我想尝试 Hive 提供的所有东西,然后再转移到另一个工具,正如 Balaswamy vaddeman 所建议的那样。

编辑2:

我开始研究自定义脚本解决方案。但是,在 Programming Hive 书的第 14 章的第一页(本章介绍了自定义脚本),我发现了以下段落:

流式传输的效率通常低于对可比较的 UDF 或 InputFormat 对象进行编码。序列化和反序列化数据以将其传入和传出管道是相对低效的。以统一的方式调试整个程序也更加困难。但是,它对于快速原型设计和利用不是用 Java 编写的现有代码很有用。对于不想编写 Java 代码的 Hive 用户来说,这可能是一种非常有效的方法。

因此很明显,就效率而言,自定义脚本并不是最佳解决方案。

但是我应该如何保留我的 UDF 函数,但要确保它在分布式 Hadoop 配置中按预期工作?我在语言手册 UDF wiki 页面的 UDF 内部部分找到了这个问题的答案。如果我写我的查询:

 SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;

它在 REDUCE 时间执行,并且 DISTRIBUTE BY 和 SORT BY 构造保证来自同一个客户的所有记录都由同一个 reducer 按调用顺序处理。

所以上面的UDF和这个查询构造解决了我的问题。

(很抱歉没有添加链接,但我不允许这样做,因为我没有足够的声望点)

4

3 回答 3

13

这是一个老问题,但为了将来的参考,我在这里写下另一个命题:

Hive Windowing 函数允许在查询中使用上一个/下一个值。

类似的代码查询可能是:

SELECT customer_id, call_time - LAG(call_time, 1, 0) OVER (PARTITION BY customer_id ORDER BY call_time) FROM mytable;
于 2014-09-02T16:13:28.440 回答
1

您可以将显式MAP-REDUCE与 Java 或 Python 等其他编程语言一起使用。{cutomer_id,call_time}在 map和 reducer 中你会得到哪里发出,{customer_id,list{time_stamp}}在 reducer 中你可以对这些时间戳进行排序并可以处理数据。

于 2013-07-03T17:01:14.003 回答
0

也许有人遇到类似的要求,我找到的解决方案如下:

1)创建自定义函数:

package com.example;
// imports (they depend on the hive version)
@Description(name = "delta", value = "_FUNC_(customer id column, call time column) "
    + "- computes the time passed between two succesive records from the same customer. "
    + "It generates 3 columns: first contains the customer id, second contains call time "
    + "and third contains the time passed from the previous call. This function returns only "
    + "the records that have a previous call from the same customer (requirements are not applicable "
    + "to the first call)", extended = "Example:\n> SELECT _FUNC_(customer_id, call_time) AS"
    + "(customer_id, call_time, time_passed) FROM (SELECT customer_id, call_time FROM mytable "
    + "DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;")
public class DeltaComputerUDTF extends GenericUDTF {
private static final int NUM_COLS = 3;

private Text[] retCols; // array of returned column values
private ObjectInspector[] inputOIs; // input ObjectInspectors
private String prevCustomerId;
private Long prevCallTime;

@Override
public StructObjectInspector initialize(ObjectInspector[] ois) throws UDFArgumentException {
    if (ois.length != 2) {
        throw new UDFArgumentException(
                "There must be 2 arguments: customer Id column name and call time column name");
    }

    inputOIs = ois;

    // construct the output column data holders
    retCols = new Text[NUM_COLS];
    for (int i = 0; i < NUM_COLS; ++i) {
        retCols[i] = new Text();
    }

    // construct output object inspector
    List<String> fieldNames = new ArrayList<String>(NUM_COLS);
    List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(NUM_COLS);
    for (int i = 0; i < NUM_COLS; ++i) {
        // column name can be anything since it will be named by UDTF as clause
        fieldNames.add("c" + i);
        // all returned type will be Text
        fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    }

    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}

@Override
public void process(Object[] args) throws HiveException {
    String customerId = ((StringObjectInspector) inputOIs[0]).getPrimitiveJavaObject(args[0]);
    Long callTime = ((LongObjectInspector) inputOIs[1]).get(args[1]);

    if (customerId.equals(prevCustomerId)) {
        retCols[0].set(customerId);
        retCols[1].set(callTime.toString());
        retCols[2].set(new Long(callTime - prevCallTime).toString());
        forward(retCols);
    }

    // Store the current customer data, for the next line
    prevCustomerId = customerId;
    prevCallTime = callTime;
}

@Override
public void close() throws HiveException {
    // TODO Auto-generated method stub

}

}

2)创建一个包含这个函数的jar。假设 jarname 是 myjar.jar。

3) 使用 Hive 将 jar 复制到机器上。假设它放在 /tmp

4)在Hive内部定义自定义函数:

ADD JAR /tmp/myjar.jar;
CREATE TEMPORARY FUNCTION delta AS 'com.example.DeltaComputerUDTF';

5) 执行查询:

SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) FROM 
  (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;

评论:

一种。我假设 call_time 列将数据存储为 bigint。如果它是字符串,在 process 函数中我们将其检索为字符串(就像我们对 customerId 所做的那样),然后将其解析为 Long

湾。我决定使用 UDTF 而不是 UDF,因为这样它会生成它需要的所有数据。否则(使用 UDF)需要过滤生成的数据以跳过 NULL 值。因此,使用原始帖子第一次编辑中描述的 UDF 函数(DeltaComputerUDF),查询将是:

SELECT customer_id, call_time, time_difference 
FROM 
  (
    SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) 
    FROM 
      (
         SELECT customer_id, call_time FROM mytable
         DISTRIBUTE BY customer_id
         SORT BY customer_id, call_time
       ) t
   ) u 
WHERE time_difference IS NOT NULL;

C。无论表中行的顺序如何,这两个函数(UDF 和 UDTF)都可以按需要工作(因此在使用 delta 函数之前,不需要按客户 ID 和调用时间对表数据进行排序)

于 2013-02-13T12:43:41.473 回答