1

我在两个集合之间有一个多对多映射表。映射表中的每一行代表一个可能的带有权重分数的映射。

mapping(id1, id2, weight)

查询:生成 id1 和 id2 之间的一对一映射。使用最低权重删除重复映射。如果有平局,则输出任意一个。

示例输入:

(1, X, 1)
(1, Y, 2)
(2, X, 3)
(2, Y, 1)
(3, Z, 2)

输出

(1, X)
(2, Y)
(3, Z)

1 和 2 都映射到 X 和 Y。我们选择映射 (1, X) 和 (2, Y),因为它们的权重最低。

4

2 回答 2

2

我假设您只对权重在涉及 id1 的任何映射中最低的映射感兴趣,并且在涉及 id2 的任何映射中也是最低的。例如,如果您另外有映射 (2, Y, 4),它不会与 (1, X, 1) 冲突。我将排除这样的映射,因为权重小于 (1, Y, 2) 和 (2, X, 3),这是不合格的。

我的解决方案如下:找到每个 id1 的最小映射权重,然后将其加入映射关系以供将来参考。使用嵌套的 foreach 遍历每个 id2:使用 ORDER 和 LIMIT 选择该 id2 的权重最小的记录,然后仅当权重也是该 id1 的最小值时才保留它。

这是完整的脚本,在您的输入上进行了测试:

mapping = LOAD 'input' AS (id1:chararray, id2:chararray, weight:double);

id1_weights =
    FOREACH (GROUP mapping BY id1)
    GENERATE group AS id1, MIN(mapping.weight) AS id1_min_weight;
mapping_with_id1_mins =
    FOREACH (JOIN mapping BY id1, id1_weights BY id1)
    GENERATE mapping::id1, id2, weight, id1_min_weight;

accepted_mappings =
    FOREACH (GROUP mapping_with_id1_mins BY id2)
    {
        ordered = ORDER mapping_with_id1_mins BY weight;
        selected = LIMIT ordered 1;
        acceptable = FILTER selected BY weight == id1_min_weight;
        GENERATE FLATTEN(acceptable);
    };

DUMP accepted_mappings;
于 2012-11-08T22:53:01.167 回答
0

通过使用Java UDF解决了它。从某种意义上说,它并不完美,它不会最大化一对一映射的数量,但它已经足够好了。

猪:

d = load 'test' as (fid, iid, priority:double);
g = group d by fid;
o = foreach g generate FLATTEN(com.propeld.pig.DEDUP(d)) as (fid, iid, priority);
store o into 'output';

g2 = group o by iid;
o2 = foreach g2 generate FLATTEN(com.propeld.pig.DEDUP(o)) as (fid, iid, priority);
store o2 into 'output2';

Java UDF:

package com.propeld.pig;

import java.io.IOException;
import java.util.Iterator;

import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

public class DEDUP extends EvalFunc<Tuple> implements Algebraic{
    public String getInitial() {return Initial.class.getName();}
    public String getIntermed() {return Intermed.class.getName();}
    public String getFinal() {return Final.class.getName();}
    static public class Initial extends EvalFunc<Tuple> {
        private static TupleFactory tfact = TupleFactory.getInstance();
        public Tuple exec(Tuple input) throws IOException {
            // Initial is called in the map.
            // we just send the tuple down
            try {
                // input is a bag with one tuple containing
                // the column we are trying to operate on
                DataBag bg = (DataBag) input.get(0);
                if (bg.iterator().hasNext()) {
                    Tuple dba = (Tuple) bg.iterator().next();
                    return dba;
                } else {
                    // make sure that we call the object constructor, not the list constructor
                    return tfact.newTuple((Object) null);
                }
            } catch (ExecException e) {
                throw e;
            } catch (Exception e) {
                int errCode = 2106;
                throw new ExecException("Error executing an algebraic function", errCode, PigException.BUG, e);
            }
        }
    }
    static public class Intermed extends EvalFunc<Tuple> {
        public Tuple exec(Tuple input) throws IOException {
            return dedup(input);
        }
    }
    static public class Final extends EvalFunc<Tuple> {
        public Tuple exec(Tuple input) throws IOException {return dedup(input);}
    }

    static protected Tuple dedup(Tuple input) throws ExecException, NumberFormatException {
        DataBag values = (DataBag)input.get(0);
        Double min = Double.MAX_VALUE;
        Tuple result = null;
        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
            Tuple t = (Tuple) it.next();

            if ((Double)t.get(2) < min){
                min = (Double)t.get(2);
                result = t;
            }
        }
        return result;
    }

    @Override
    public Tuple exec(Tuple input) throws IOException {
        return dedup(input);
    }
}
于 2012-11-08T22:49:18.627 回答