我正在研究 PIG 脚本,该脚本对原始交易执行繁重的数据处理并提出各种交易模式。
假设其中一种模式是 - 查找一天内收到跨境交易的所有账户(包括总交易量和交易量)。
我的预期输出应该是两个数据文件 1) 汇总数据——比如账户 A1 从国家 AU 收到了 50 笔交易。2) 原始交易 - A1 的所有交易超过 50 笔。
我的 PIG 脚本当前正在创建以下格式的输出数据源
账户国家 TotalTxns RawTransactions
A1 AU 50 [(Txn1), (Txn2), (Txn3)....(Txn50)]
A2 JP 30 [(Txn1), (Txn2)....(Txn30)]
现在的问题是,当我从 Hadoop 系统(到某个数据库)中获取这些数据时,我想在我的汇总记录(A1、AU、50)与所有 50 个原始事务(如用作外国的汇总记录的 ID 1)之间建立链接所有 50 个相关 Txns 的密钥)。
我知道分布式的 Hadoop 不应该用于分配 ID,但是是否有任何选项可以分配非唯一 ID(不需要顺序)或其他方式来链接这些数据?
编辑(使用 DataFu 中的枚举后)这里是 PIG 脚本
register /UDF/datafu-0.0.8.jar
define Enumerate datafu.pig.bags.Enumerate('1');
data_txn = LOAD './txndata' USING PigStorage(',') AS (txnid:int, sndr_acct:int,sndr_cntry:chararray, rcvr_acct:int, rcvr_cntry:chararray);
data_txn1 = GROUP data_txn ALL;
data_txn2 = FOREACH data_txn1 GENERATE flatten(Enumerate(data_txn));
dump data_txn2;
运行这个之后,我得到
错误 org.apache.pig.tools.pigstats.SimplePigStats - 错误 2997:无法从支持的错误重新创建异常:java.lang.NullPointerException at datafu.pig.bags.Enumerate.enumerateBag(Enumerate.java:89) at datafu.pig .bags.Enumerate.accumulate(Enumerate.java:104) ....