我现在在本地模式下使用 Pig 0.11.1,从 CSV 加载数据。
到目前为止,我已经能够加载我们的数据集并对其执行所需的计算。下一步是从数据中抽取一些样本并执行相同的计算。为了复制现有流程,我们希望每十五分钟获取一个数据点。
这就是问题所在。我可以在 Pig 中编写一个过滤器,如果数据点恰好在 15 分钟间隔内,我将匹配该过滤器,但我将如何获取接近 15 分钟边界的数据点?
我需要查看十五分钟的标记并抓住那里的记录。如果该标记上没有记录(很可能),那么我需要在标记之后抓取下一条记录。
我想我需要编写自己的过滤器 UDF,但似乎 UDF 需要是有状态的,以便它知道何时在时间间隔后找到第一个匹配项。我无法找到任何有状态 UDF 的示例,据我所知,这可能是一个坏主意,因为我们不知道最终在 Hadoop 上运行时如何映射/减少数据。
我可以通过存储键/时间戳值并编写一个解析这些值的 Python 脚本来分几步完成此操作。不过,我真的很想在 Pig 中尽可能多地保留这个过程。
编辑:最基本的数据是这样的:{id:long, timestamp:long}
. 以timestamp
毫秒为单位。每组数据按 排序timestamp
。如果记录 X 正好落在最小值timestamp
(开始时间)之后的 15 分钟边界上,则抓住它。否则,只要有可能,在 15 分钟边界之后抓取下一个记录。我没有一个很好的例子来说明预期的结果是什么,因为我没有时间手动整理数据。