1

我是猪的新手。这是我要完成的一些伪代码:

FOREACH split_records {
  UPDATE updated_volume SET 
  open=updated_volume.open*split_records.multiply_by/split_records.divide_by,
  close=updated_volume.close*split_records.multiply_by/split_records.divide_by

  WHERE split_records.symbol=updated_volume.symbol AND 
  updated_volume.date < split_records.split_date
}

这是迄今为止我在猪代码方面的内容:

FOREACH split_records {
  SPLIT updated_volume INTO split_yes IF updated_volume.symbol==split_records.symbol AND 
  updated_volume.date < split_records.splitDate, split_no IF 
  updated_volume.symbol!=split_records.symbol OR 
  updated_volume.date > split_Records.splitDate;

  updated_splits = FOREACH split_yes GENERATE
  symbol,
  date,
  (split_yes.open*split_records.multiply_by/split_records.divide_by) AS open,
  (split_yes.close*split_records.multiply_by/split_records.divide_by) AS close;

  updated_volume = UNION updated_splits, split_no;
};

上面的代码给了我错误:不匹配的输入'SPLIT'期望生成,所以它肯定不起作用。但基本上我正在尝试模拟“UPDATE..WHERE”操作,其中 WHERE 条件取决于一个变量,该变量是迭代另一组记录的结果,其长度/计数未知。

我有一种模糊的印象,Pig 不是那种用于迭代的语言,所以我对任何实现这一目标的方法持开放态度。

4

2 回答 2

1

我认为这段代码做了一些你想要做的事情。对于 updated_volume 中的每条记录,它会应用所有在它之后的相应 split_records。

cgrp = COGROUP updated_volume BY symbol, split_records BY symbol;
SPLIT cgrp INTO
    did_split IF SIZE(split_records) > 0,
    did_not_split OTHERWISE;

-- reflatten data for symbols that did not split
not_updated     =   FOREACH did_not_split GENERATE
                        FLATTEN(updated_volume);

-- update data for symbols that did split
to_be_updated   =   FOREACH did_split GENERATE
                        FLATTEN(updated_volume) AS (symbol, volume_date, open, close),
                        split_records;
updated         =   FOREACH to_be_updated {
                        applicable_splits = FILTER split_records BY date >= volume_date;
                        GENERATE
                            symbol, volume_date AS date,
                            -- NOTE: you would have to write a quick udf
                            --       in jython (or java) to calculate the product
                            --       of a bag of numbers
                            open * my_udfs.product(split_records.multiply_by) / PRODUCT(split_records.divide_by)
                            AS open,
                            close * my_udfs.product(split_records.multiply_by) / PRODUCT(split_records.divide_by)
                            AS close;
                    }

updated_volume  = UNION updated, not_updated;
于 2013-10-02T22:58:25.787 回答
0

你可以使用(条件)?true : false表示打开和关闭列。

于 2013-09-29T20:14:00.413 回答