据我所知,Pig 没有提供任何内置方法来从地图中获取密钥。对于 map 字段,您只能使用 SIZE 函数获取其大小,使用 IsEmpty 函数检查它是否为空,或使用 map_field#'key' 查找给定键的值。
我自己写了一些 UDF 来帮助我更好地处理地图数据类型。我的一个功能可能对您有用 - MapToBag - 它可以将地图转换:map[value_type]
为包:bag{:tuple(key:chararray, value:value_type)}
。有了包,你既可以拿到钥匙,也可以申请 FLATTEN 操作。
package com.XXX.YYY.ZZZ;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.data.*;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class MapToBag extends EvalFunc<DataBag> {
@Override
public DataBag exec(Tuple input) throws IOException {
if (input == null || input.size() == 0) {
return null;
}
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) input.get(0);
if (map == null || map.isEmpty()) {
return null;
}
DataBag bag = bagFactory.newDefaultBag();
for (String key : map.keySet()) {
Object value = map.get(key);
Tuple oneKeyTuple = tupleFactory.newTuple(2);
oneKeyTuple.set(0, key);
oneKeyTuple.set(1, value);
bag.add(oneKeyTuple);
}
return (bag.size() == 0) ? null : bag;
}
@Override
public Schema outputSchema(Schema input) {
try {
Schema innerSchema = new Schema();
innerSchema.add(new Schema.FieldSchema("key", DataType.CHARARRAY));
innerSchema.add(getMapValueSchema(input));
Schema tupleSchema = new Schema(new Schema.FieldSchema(null, innerSchema, DataType.TUPLE));
return new Schema(new Schema.FieldSchema(null, tupleSchema, DataType.BAG));
} catch (FrontendException e) {
return new Schema(new Schema.FieldSchema(null, DataType.BAG));
}
}
protected Schema.FieldSchema getMapValueSchema(Schema input) throws FrontendException {
if (input == null || input.size() == 0) {
return null;
}
Schema.FieldSchema mapField = input.getField(0);
if (mapField.type != DataType.MAP) {
return null;
}
Schema valueSchema = mapField.schema;
if (valueSchema == null || valueSchema.size() == 0) {
return null;
}
Schema.FieldSchema valueField = valueSchema.getField(0);
valueField.alias = "value";
return valueField;
}
@Override
public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
List<FuncSpec> funcList = new ArrayList<FuncSpec>();
funcList.add(new FuncSpec(this.getClass().getName(), new Schema(new Schema.FieldSchema(null, DataType.MAP))));
return funcList;
}
private static TupleFactory tupleFactory = TupleFactory.getInstance();
private static BagFactory bagFactory = BagFactory.getInstance();
}