0

我有一些交易数据,比如

txn_id、receiver_userid、sender_userid、金额

1,1,2,50

2,1,2,100

3,1,2,500
4,5,3,100
5,5,3,200
6,5,3,300
7,6,2,200

8,6,1,200

现在我想找到从另一个用户收到超过 2 笔交易的所有接收者用户,我已经开始将我的 PIG 工作写为

txnrecord = LOAD './txndata' USING PigStorage(',') AS (txn_id:int, receiver_userid:int, sender_userid:int, amount:int);
grptxn1 = GROUP txnrecord BY (receiver_userid, sender_userid);
txncount = FOREACH grptxn1 GENERATE FLATTEN(group) as (receiver_userid, sender_userid), COUNT(txnrecord) as num_txns, SUM(txnrecord.amount) as total_sum;
txncount1 = FILTER txncount by num_txns > 2;
dump txncount1;

以上是给我正确的组聚合,但我的额外要求是

1) 找到聚合组记录及其关联的元组集(单个 txns),例如 - 如果我的组聚合说用户 ID 1 收到来自用户 ID 2 的 3 个事务,我需要将所有三个元组存储在另一个数据文件中。

2)组聚合不匹配> 2个事务条件应该被忽略(这里最后两条记录应该被忽略)

3)我想为我的组聚合分配序列,并且应该将相同的序列用作它们关联的事务元组中的链接键(以识别这三个事务记录与特定的组聚合相关联)。

我正在尝试使用各种功能,但到目前为止还没有运气。

感谢您提供任何帮助指针,谢谢。

4

1 回答 1

0

您可以携带由 GROUP BY 创建的 BAG,它们包含所有原始列,以进行检查

DESCRIBE grptxn1;

要回答要求 1 和 2:

txnrecord = LOAD './txndata' USING PigStorage(',') AS (txn_id:int, receiver_userid:int, sender_userid:int, amount:int);
grptxn1 = GROUP txnrecord BY (receiver_userid, sender_userid);
txncount = FOREACH grptxn1 GENERATE FLATTEN(group) as (receiver_userid, sender_userid), 
txnrecord, -- carry bags through the filter
COUNT(txnrecord) as num_txns, SUM(txnrecord.amount) as total_sum ;
txncount1 = FILTER txncount by num_txns > 2;
tran_dump = FOREACH  txncount1 GENERATE FLATTEN(txnrecord);
STORE tran_dump INTO 'another data file';

txncount2 = FOREACH txncount1 GENERATE (receiver_userid, sender_userid, num_txns, total_sum);
dump txncount2;

要求 3 在 MapReduce 中是不容易做到的,如果不让它变得非常慢或使用一些网络 ID 代理。可能您不需要它,因为 FLATTEN(txnrecord) 将转储输入文件中存在的所有列。

于 2012-11-21T11:06:08.077 回答