1

我正在研究 PIG 脚本,该脚本对原始交易执行繁重的数据处理并提出各种交易模式。

假设其中一种模式是 - 查找一天内收到跨境交易的所有账户(包括总交易量和交易量)。

我的预期输出应该是两个数据文件 1) 汇总数据——比如账户 A1 从国家 AU 收到了 50 笔交易。2) 原始交易 - A1 的所有交易超过 50 笔。

我的 PIG 脚本当前正在创建以下格式的输出数据源

账户国家 TotalTxns RawTransactions

A1 AU 50 [(Txn1), (Txn2), (Txn3)....(Txn50)]

A2 JP 30 [(Txn1), (Txn2)....(Txn30)]

现在的问题是,当我从 Hadoop 系统(到某个数据库)中获取这些数据时,我想在我的汇总记录(A1、AU、50)与所有 50 个原始事务(如用作外国的汇总记录的 ID 1)之间建立链接所有 50 个相关 Txns 的密钥)。

我知道分布式的 Hadoop 不应该用于分配 ID,但是是否有任何选项可以分配非唯一 ID(不需要顺序)或其他方式来链接这些数据?

编辑(使用 DataFu 中的枚举后)这里是 PIG 脚本

register /UDF/datafu-0.0.8.jar
define Enumerate datafu.pig.bags.Enumerate('1');
data_txn = LOAD './txndata' USING PigStorage(',') AS (txnid:int, sndr_acct:int,sndr_cntry:chararray, rcvr_acct:int, rcvr_cntry:chararray);
data_txn1 = GROUP data_txn ALL;
data_txn2 = FOREACH data_txn1 GENERATE flatten(Enumerate(data_txn));
dump data_txn2;

运行这个之后,我得到

错误 org.apache.pig.tools.pigstats.SimplePigStats - 错误 2997:无法从支持的错误重新创建异常:java.lang.NullPointerException at datafu.pig.bags.Enumerate.enumerateBag(Enumerate.java:89) at datafu.pig .bags.Enumerate.accumulate(Enumerate.java:104) ....

4

4 回答 4

0

您的行中有什么独特之处?帐户 ID 和国家/地区代码似乎是您在 Pig 脚本中分组的内容,那么为什么不使用它们创建一个复合键呢?就像是

CONCAT(CONCAT(account, '-'), country)

当然,您可以编写一个 UDF 以使其更优雅。如果您需要一个数字 ID,请尝试编写一个 UDF,该 UDF 将创建上述字符串,然后调用其hashCode()方法。当然,这不能保证唯一性,但你说没关系。您始终可以构建自己的方法将字符串转换为唯一的整数。

但话虽如此,为什么您需要一个 ID 密钥?如果您想稍后连接两个表的字段,您可以一次连接多个字段。

于 2013-01-24T22:09:48.313 回答
0

DataFu 在 Enumerate 中有一个 bug,该 bug 在 0.0.9 中已修复,因此请使用 0.0.9 或更高版本。

于 2013-07-03T05:19:21.570 回答
0

我经常在 Hadoop 作业中分配随机 ID。您只需要确保生成包含足够数量的随机位的 id,以确保冲突的概率足够小(http://en.wikipedia.org/wiki/Birthday_problem)。

根据经验,我使用 3*log(n) 随机位,其中 n = 需要生成的 ID 数。

在许多情况下,Java 的 UUID.randomUUID() 就足够了。

http://en.wikipedia.org/wiki/Universally_unique_identifier#Random_UUID_probability_of_duplicates

于 2013-01-24T22:27:37.373 回答
0

如果您的 ID 是数字并且您不能使用 UUID 或其他基于字符串的 ID。LinkedIn ( DataFu ) 有一个UDF 的 DataFu 库,其中包含一个非常有用的 UDF Enumerate。所以你可以做的就是将所有记录分组到一个包中,然后将包传递给 Enumerate。这是我脑海中的代码:

register jar with UDF with Enumerate UDF
inpt = load '....' ....;
allGrp = group inpt all;
withIds = foreach allGrp generate flatten(Enumerate(inpt));
于 2013-01-25T00:12:19.040 回答