
>> cat data
ID | ColumnName1:Value1 | ColumnName2:Value2


grunt >> A = load '$data' using PigStorage('|');    
grunt >> dump A;    


(ID, ColumnName1, Value1)
(ID, ColumnName2, Value2)

我可以将 UDF 与 foreach 一起使用并生成吗?像下面这样的东西?

grunt >> foreach A generate SOMEUDF(A)



public class SPLITTUPPLE extends EvalFunc <List<Tuple>>
    public List<Tuple> exec(Tuple input) throws IOException {
        if (input == null || input.size() == 0)
            return null;
            // not sure how whether I can create tuples on my own. Looks like I should use TupleFactory.
            // return list of tuples.
        }catch(Exception e){
            throw WrappedIOException.wrap("Caught exception processing input row ", e);



3 回答 3


您可以编写 UDF 或使用带有内置函数的 PIG 脚本。


-- data should be chararray, PigStorage('|') return bytearray which will not work for this example
inpt = load '/pig_fun/input/single_tuple_to_multiple.txt' as (line:chararray);

-- split by | and create a row so we can dereference it later
splt = foreach inpt generate FLATTEN(STRSPLIT($0, '\\|')) ;

-- first column is id, rest is converted into a bag and flatten it to make rows
id_vals = foreach splt generate $0 as id, FLATTEN(TOBAG(*)) as value;
-- there will be records with (id, id), but id should not have ':'
id_vals = foreach id_vals generate id, INDEXOF(value, ':') as p, STRSPLIT(value, ':', 2) as vals;
final = foreach (filter id_vals by p != -1) generate id, FLATTEN(vals) as (col, val);
dump final;







于 2012-07-02T18:04:07.723 回答

这是UDF版本。我更喜欢退回 BAG:

import java.io.IOException;

import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;

 * Converts input chararray "ID|ColumnName1:Value1|ColumnName2:Value2|.." into a bag 
 * {(ID, ColumnName1, Value1), (ID, ColumnName2, Value2), ...}
 *  Default rows separator is '|' and key value separator is ':'. 
 *  In this implementation white spaces around separator characters are not removed.
 *  ID can be made of any character (including sequence of white spaces). 
 * @author 
public class TupleToBagColumnValuePairs extends EvalFunc<DataBag> {

    private static final TupleFactory tupleFactory = TupleFactory.getInstance();
    private static final BagFactory bagFactory = BagFactory.getInstance();

    //Row separator character. Default is '|'.
    private String rowsSeparator;
    //Column value separator character. Default i
    private String columnValueSeparator;

    public TupleToBagColumnValuePairs() {
        this.rowsSeparator = "\\|";
        this.columnValueSeparator = ":";

    public TupleToBagColumnValuePairs(String rowsSeparator, String keyValueSeparator) {
        this.rowsSeparator = rowsSeparator;
        this.columnValueSeparator = keyValueSeparator;

     * Creates a tuple with 3 fields (id:chararray, column:chararray, value:chararray)
     * @param outputBag Output tuples (id, column, value) are added to this bag
     * @param id
     * @param column
     * @param value
     * @throws ExecException
    protected void addTuple(DataBag outputBag, String id, String column, String value) throws ExecException {
        Tuple outputTuple = tupleFactory.newTuple();
        outputTuple.append( value);

     * Takes column{separator}value from splitInputLine, splits id into column value and adds them to the outputBag as (id, column, value)
     * @param outputBag Output tuples (id, column, value) should be added to this bag
     * @param id 
     * @param splitInputLine format column{separator}value, which start from index 1
     * @throws ExecException
    protected void parseColumnValues(DataBag outputBag, String id,
            String[] splitInputLine) throws ExecException {
        for (int i = 1; i < splitInputLine.length; i++) {
            if (splitInputLine[i] != null) {
                int columnValueSplitIndex = splitInputLine[i].indexOf(this.columnValueSeparator);
                if (columnValueSplitIndex != -1) {
                    String column = splitInputLine[i].substring(0, columnValueSplitIndex);
                    String value = null;
                    if (columnValueSplitIndex + 1 < splitInputLine[i].length()) {
                        value = splitInputLine[i].substring(columnValueSplitIndex + 1);
                    this.addTuple(outputBag, id, column, value);
                } else {
                    String column = splitInputLine[i];
                    this.addTuple(outputBag, id, column, null);

     * input - contains only one field of type chararray, which will be split by '|'
     * All inputs that are: null or of length 0 are ignored.
    public DataBag exec(Tuple input) throws IOException {
        if (input == null || input.size() != 1 || input.isNull(0)) {
            return null;

        String inputLine = (String)input.get(0);
        String[] splitInputLine = inputLine.split(this.rowsSeparator, -1);

        if (splitInputLine.length > 1 && splitInputLine[0].length() > 0) {
            String id = splitInputLine[0];
            DataBag outputBag = bagFactory.newDefaultBag();            
            if (splitInputLine.length == 1) { // there is just an id in the line
                this.addTuple(outputBag, id, null, null);
            } else {
                this.parseColumnValues(outputBag, id, splitInputLine);

           return outputBag; 
        return null;

    public Schema outputSchema(Schema input) {
        try {
            if (input.size() != 1) {
                throw new RuntimeException("Expected input to have only one field");

            Schema.FieldSchema inputFieldSchema = input.getField(0);
            if (inputFieldSchema.type != DataType.CHARARRAY) {
                throw new RuntimeException("Expected a CHARARRAY as input");

            Schema tupleSchema = new Schema();
            tupleSchema.add(new Schema.FieldSchema("id", DataType.CHARARRAY));
            tupleSchema.add(new Schema.FieldSchema("column", DataType.CHARARRAY));
            tupleSchema.add(new Schema.FieldSchema("value", DataType.CHARARRAY));

            return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), tupleSchema, DataType.BAG));
        } catch (FrontendException exx) {
            throw new RuntimeException(exx);


以下是它在 PIG 中的使用方式:

register 'path to the jar';
define IdColumnValue myPackage.TupleToBagColumnValuePairs();

inpt = load '/pig_fun/input/single_tuple_to_multiple.txt' as (line:chararray);
result = foreach inpt generate FLATTEN(IdColumnValue($0)) as (id1, c2, v2);
dump result;

用包编写 UDF 的一个很好的灵感请参阅LinkedIn 的 DataFu 源代码

于 2012-07-03T11:19:13.687 回答

您可以在 STRSPLIT 的输出上使用 TransposeTupleToBag(来自 DataFu lib 的 UDF)来获取包,然后 FLATTEN 包以在每个原始列中创建单独的行。

于 2014-08-08T21:03:20.633 回答