0

我想通过拆分两个元组(或在 Pig 中调用的任何内容)来运行 Pig 脚本,基于 中的标准col2,并在操作之后col2,进入另一列,比较两个操作的元组并进行额外的排除。

REGISTER /home/user1/piggybank.jar;

log = LOAD '../user2/hadoop_file.txt' AS (col1, col2);

--log = LIMIT log 1000000;
isnt_filtered = FILTER log BY (NOT col2 == 'Some value');
isnt_generated = FOREACH isnt_filtered GENERATE col2, col1, RANDOM() * 1000000 AS random, com.some.valueManipulation(col1) AS isnt_manipulated;

is_filtered = FILTER log BY (col2 == 'Some value');
is_generated = FOREACH is_filtered GENERATE com.some.calculation(col1) AS is_manipulated;
is_distinct = DISTINCT is_generated;

拆分和操作是容易的部分。这就是复杂的地方。. .

merge_filtered = FOREACH is_generated {FILTER isnt_generated BY (NOT isnt_manipulated == is_generated.is_manipulated)};

如果我能弄清楚这条线,其余的就会到位。

merge_ordered = ORDER merge_filtered BY random, col2, col1;
merge_limited = LIMIT merge_ordered 400000;

STORE merge_limited into 'file';

以下是 I/O 的示例:

col1                col2            manipulated
This                qWerty          W
Is                  qweRty          R
An                  qwertY          Y
Example             qwErty          E
Of                  qwerTy          T
Example             Qwerty          Q
Data                qWerty          W


isnt
E
Y


col1                col2
This                qWerty
Is                  qweRty
Of                  qwerTy
Example             Qwerty
Data                qWerty
4

1 回答 1

2

我仍然不太确定您需要什么,但我相信您可以使用以下内容(未经测试)重现您的输入和输出:

data = LOAD 'input' AS (col1:chararray, col2:chararray);
exclude = LOAD 'exclude' AS (excl:chararray);

m = FOREACH data GENERATE col1, col2, YourUDF(col2) AS manipulated;
test = COGROUP m BY manipulated, exclude BY excl;

-- Here you can choose IsEmpty or NOT IsEmpty according to whether you want to exclude or include
final = FOREACH (FILTER test BY IsEmpty(exclude)) GENERATE FLATTEN(m);

使用COGROUP,您可以通过分组键对每个关系中的所有元组进行分组。如果元组包exclude为空,则意味着分组键不存在于排除列表中,因此您可以m使用该键保留元组。相反,如果分组键存在于 中exclude,则该包不会为空,并且m带有该键的元组将被过滤掉。

于 2012-11-19T18:36:51.743 回答