0

我正在加快 PIG 的速度,并将来自两个来源的 web_log 数据和股票定价历史结合起来。日期/时间被标准化为时间戳,并在股票代码上执行连接。时间戳不匹配。

jnd = JOIN web_time BY w_sym, stock_sort BY group;

该组包含一袋特定于该代码的股票数据。这是组合模式。

jnd: {web_time::ip: chararray,web_time::user: chararray,web_time::w_time: long,web_time::url: chararray,stock_sort::sort: {(sym: chararray,time: long,price: double) }}

我需要使用 web_time::w_time 和 time 过滤 stock_sort Bag,它不是完全匹配的。示例 JND 数据如下所示。

(14.192.253.226,VorationZing1213201721000 get /vlccf.html http /1.0”,{(vlccf,1265361975000,13.84)14.48),(VLCCF,1265028034000,14.5),(VLCCF 1262678148000,13.76 ,(VLCCF1262607761000,13.82 ,23.99),(VLCCF, 883720431000 ,23.57)})

使用 $2 中的值,最终我需要过滤除一个条目之外的所有条目,但现在我正在尝试删除时间戳较小的元组。

flake = FOREACH jnd {
    fits = FILTER jnd BY (w_time > time);
    GENERATE ip, user, w_time, url, fits;
    }

以上不起作用,这是第 1 步,删除所有时间戳小于所需时间(w_time)的 Bag 元组。w_time 不是组的一部分。这真的需要UDF还是我错过了一些简单的东西?我处于静止状态。

开发环境

Apache Pig 版本 0.15.0.2.4.0.0-169 (rexported) 编译 2016 年 2 月 10 日 07:50:04 Hadoop 2.7.1.2.4.0.0-169 Subversion git@github.com:hortonworks/hadoop.git -r 26104d8ac833884c8776473823007f17 4 节点 Hortonworks 集群

任何输入表示赞赏。

4

1 回答 1

0

我认为在你的 foreach 中,你需要过滤 stock_sort::sort。不是JND。并且过滤应该通过 jnd.w_time > time 来完成。我设法编写了整个流程;没有UDF。见下文。

拿了两个文件:

xact.txt:

VLCCF,1265361975000,13.84
VLCCF,1265262560000,14.16
VLCCF,1265192740000,14.44
VLCCF,1265099390000,14.48
VLCCF,1265028034000,14.5
VLCCF,1262678148000,13.76
VLCCF,1262607761000,13.82
VLCCF,1233832497000,16.9
VLCCF,1233740569000,16.96
VLCCF,884004754000,23.99
VLCCF,883720431000,23.5

股票.txt

14.192.253.226,voraciouszing,1213201721000,"GET /VLCCF.html HTTP/1.0",VLCCF

stock = load 'stock.txt' using PigStorage(',') as (
ip:chararray,
user:chararray,
w_time:long,
url:chararray,
symbol:chararray
);

xact = load 'xact.txt' using PigStorage(',') as (
symbol:chararray,
time:long,
price:double
);

xact_grouped = foreach(group xact by symbol) generate
    group, xact;

joined = join stock by symbol, xact_grouped by group;

filtered = foreach joined {
    grp = filter xact by time < joined.w_time;
    generate ip, grp;
};

dump filtered;

给我

(14.192.253.226,{(VLCCF,884004754000,23.99),(VLCCF,883720431000,23.5)})

编辑:或者

stock = load 'stock.txt' using PigStorage(',') as (
ip:chararray,
user:chararray,
w_time:long,
url:chararray,
symbol:chararray
);

xact = load 'xact.txt' using PigStorage(',') as (
symbol:chararray,
time:long,
price:double
);

joined = join stock by symbol, xact by symbol;

joined_filtered = foreach (filter joined by time < w_time) generate
    ip as ip,
    user as user,
    w_time as w_time,
    stock::symbol as symbol,
    time as time,
    price as price;

grouped = foreach (group joined_filtered by (ip, user, w_time)) generate
    flatten(group),
    joined_filtered;
于 2016-04-27T11:53:34.300 回答