我有一个 Hive 表,其中包含客户呼叫的数据。为简单起见,考虑它有 2 列,第一列保存客户 ID,第二列保存调用的时间戳(unix 时间戳)。
我可以查询此表以查找每个客户的所有电话:
SELECT * FROM mytable SORT BY customer_id, call_time;
结果是:
Customer1 timestamp11
Customer1 timestamp12
Customer1 timestamp13
Customer2 timestamp21
Customer3 timestamp31
Customer3 timestamp32
...
是否可以创建一个 Hive 查询,从第二次调用开始,为每个客户返回两次连续调用之间的时间间隔?对于上面的示例,查询应该返回:
Customer1 timestamp12-timestamp11
Customer1 timestamp13-timestamp12
Customer3 timestamp32-timestamp31
...
我试图从sql 解决方案中调整解决方案,但我被 Hive 限制所困扰:它只接受 FROM 中的子查询,并且连接必须只包含 equalities。
谢谢你。
编辑1:
我尝试使用 Hive UDF 函数:
public class DeltaComputerUDF extends UDF {
private String previousCustomerId;
private long previousCallTime;
public String evaluate(String customerId, LongWritable callTime) {
long callTimeValue = callTime.get();
String timeDifference = null;
if (customerId.equals(previousCustomerId)) {
timeDifference = new Long(callTimeValue - previousCallTime).toString();
}
previousCustomerId = customerId;
previousCallTime = callTimeValue;
return timeDifference;
}}
并将其与名称“delta”一起使用。
但似乎(从日志和结果)它是在 MAP 时间使用的。由此产生2个问题:
第一:使用此功能前,表数据必须按客户ID和时间戳排序。查询:
SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time;
不起作用,因为排序部分是在 REDUCE 时间执行的,在我的功能被使用很久之后。
我可以在使用该函数之前对表数据进行排序,但我对此并不满意,因为这是我希望避免的开销。
第二:在分布式 Hadoop 配置的情况下,数据在可用的作业跟踪器之间进行拆分。所以我相信这个函数会有多个实例,每个映射器一个,因此可以在 2 个映射器之间拆分相同的客户数据。在这种情况下,我会失去客户的电话,这是不可接受的。
我不知道如何解决这个问题。我知道 DISTRIBUTE BY 确保所有具有特定值的数据都发送到同一个减速器(从而确保 SORT 按预期工作),有人知道映射器是否有类似的东西吗?
接下来我计划按照 libjack 的建议使用 reduce 脚本。在其他一些 hive 查询之间需要这种“计算”,所以我想尝试 Hive 提供的所有东西,然后再转移到另一个工具,正如 Balaswamy vaddeman 所建议的那样。
编辑2:
我开始研究自定义脚本解决方案。但是,在 Programming Hive 书的第 14 章的第一页(本章介绍了自定义脚本),我发现了以下段落:
流式传输的效率通常低于对可比较的 UDF 或 InputFormat 对象进行编码。序列化和反序列化数据以将其传入和传出管道是相对低效的。以统一的方式调试整个程序也更加困难。但是,它对于快速原型设计和利用不是用 Java 编写的现有代码很有用。对于不想编写 Java 代码的 Hive 用户来说,这可能是一种非常有效的方法。
因此很明显,就效率而言,自定义脚本并不是最佳解决方案。
但是我应该如何保留我的 UDF 函数,但要确保它在分布式 Hadoop 配置中按预期工作?我在语言手册 UDF wiki 页面的 UDF 内部部分找到了这个问题的答案。如果我写我的查询:
SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;
它在 REDUCE 时间执行,并且 DISTRIBUTE BY 和 SORT BY 构造保证来自同一个客户的所有记录都由同一个 reducer 按调用顺序处理。
所以上面的UDF和这个查询构造解决了我的问题。
(很抱歉没有添加链接,但我不允许这样做,因为我没有足够的声望点)