3

我正在努力解决这个问题。我有与 Pig 0.8.1 完美运行的脚本和 UDF,但是当我尝试使用 Pig 0.10.0 运行时,我得到:

ERROR org.apache.pig.tools.grunt.Grunt - ERROR 2218: Invalid resource schema: bag schema  must have tuple as its field

从 Pig 脚本调用 UDF 的代码如下所示:

    parsed = LOAD '$INPUT' 
    USING pignlproc.storage.ParsingWikipediaLoader('$LANG')
    AS (title, id, pageUrl, text, redirect, links, headers, paragraphs);

ParsingWikipediaLoader 类实现了 LoadMetaData,getSchema() 方法如下所示:

    public ResourceSchema getSchema(String location, Job job)
        throws IOException {
    Schema schema = new Schema();
    schema.add(new FieldSchema("title", DataType.CHARARRAY));
    schema.add(new FieldSchema("id", DataType.CHARARRAY));
    schema.add(new FieldSchema("uri", DataType.CHARARRAY));
    schema.add(new FieldSchema("text", DataType.CHARARRAY));
    schema.add(new FieldSchema("redirect", DataType.CHARARRAY));
    Schema linkInfoSchema = new Schema();
    linkInfoSchema.add(new FieldSchema("target", DataType.CHARARRAY));
    linkInfoSchema.add(new FieldSchema("begin", DataType.INTEGER));
    linkInfoSchema.add(new FieldSchema("end", DataType.INTEGER));
    schema.add(new FieldSchema("links", linkInfoSchema, DataType.BAG));
    Schema headerInfoSchema = new Schema();
    headerInfoSchema.add(new FieldSchema("tagname", DataType.CHARARRAY));
    headerInfoSchema.add(new FieldSchema("begin", DataType.INTEGER));
    headerInfoSchema.add(new FieldSchema("end", DataType.INTEGER));
    schema.add(new FieldSchema("headers", headerInfoSchema, DataType.BAG));
    Schema paragraphInfoSchema = new Schema();
    paragraphInfoSchema.add(new FieldSchema("tagname", DataType.CHARARRAY));
    paragraphInfoSchema.add(new FieldSchema("begin", DataType.INTEGER));
    paragraphInfoSchema.add(new FieldSchema("end", DataType.INTEGER));
    schema.add(new FieldSchema("paragraphs", paragraphInfoSchema,
            DataType.BAG));

    return new ResourceSchema(schema);
}

同样,脚本和 UDF 在 Pig 0.8.1 中按预期工作,所以这必须是版本之间的一些差异。我已经彻底搜索过,但在文档或 Stack Overflow 上找不到任何关于此的内容。

4

1 回答 1

2

看起来区别在于ResourceFieldSchema构造函数。

0.8.1 检测到一个 Bag 并将内部模式包装在一个元组中,而此逻辑已从 0.10.0 中删除。我想你需要修改你的模式定义以将包模式包装在一个元组中:

schema.add(new FieldSchema("links", new Schema(
     new FieldSchema("t", linkInfoSchema)), DataType.BAG));

然而,当在 0.8.1 上使用时,这确实会产生一个元组中的模式:

  • 0.10.0:{title: chararray,id: chararray,uri: chararray,text: chararray,redirect: chararray,links: {t: (target: chararray,begin: int,end: int)},headers: {t: (tagname: chararray,begin: int,end: int)},paragraphs: {t: (tagname: chararray,begin: int,end: int)}}
  • 0.8.1:{title: chararray,id: chararray,uri: chararray,text: chararray,redirect: chararray,links: {t: (t: (target: chararray,begin: int,end: int))},headers: {t: (t: (tagname: chararray,begin: int,end: int))},paragraphs: {t: (t: (tagname: chararray,begin: int,end: int))}}

您可以通过将两级访问所需标志修改为 true 来解决此问题:

    Schema linkInfoSchema = new Schema();
    linkInfoSchema.add(new FieldSchema("target", DataType.CHARARRAY));
    linkInfoSchema.add(new FieldSchema("begin", DataType.INTEGER));
    linkInfoSchema.add(new FieldSchema("end", DataType.INTEGER));
    Schema linkInfoSchemaTupleWrapper = new Schema(new FieldSchema("t",
            linkInfoSchema));
    linkInfoSchemaTupleWrapper.setTwoLevelAccessRequired(true);
    schema.add(new FieldSchema("links", linkInfoSchemaTupleWrapper, DataType.BAG));

然后在 0.10.0 和 0.8.1 之间产生相同的模式:

{title: chararray,id: chararray,uri: chararray,text: chararray,redirect: chararray,links: {t: (target: chararray,begin: int,end: int)},headers: {t: (tagname: chararray,begin: int,end: int)},paragraphs: {t: (tagname: chararray,begin: int,end: int)}}

{title: chararray,id: chararray,uri: chararray,text: chararray,redirect: chararray,links: {t: (target: chararray,begin: int,end: int)},headers: {t: (tagname: chararray,begin: int,end: int)},paragraphs: {t: (tagname: chararray,begin: int,end: int)}}

0.10.0

    /**
     * Construct using a {@link org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema} as the template.
     * @param fieldSchema fieldSchema to copy from
     */
    public ResourceFieldSchema(FieldSchema fieldSchema) {
        type = fieldSchema.type;
        name = fieldSchema.alias;
        description = "autogenerated from Pig Field Schema";
        Schema inner = fieldSchema.schema;

        // allow partial schema 
        if ((type == DataType.BAG || type == DataType.TUPLE || type == DataType.MAP)
                && inner != null) {
            schema = new ResourceSchema(inner);
        } else {
            schema = null;
        }
    }

0.8.1

    /**
     * Construct using a {@link org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema} as the template.
     * @param fieldSchema fieldSchema to copy from
     */
    public ResourceFieldSchema(FieldSchema fieldSchema) {
        type = fieldSchema.type;
        name = fieldSchema.alias;
        description = "autogenerated from Pig Field Schema";
        Schema inner = fieldSchema.schema;
        if (type == DataType.BAG && fieldSchema.schema != null
                && !fieldSchema.schema.isTwoLevelAccessRequired()) { 
            log.info("Insert two-level access to Resource Schema");
            FieldSchema fs = new FieldSchema("t", fieldSchema.schema);
            inner = new Schema(fs);                
        }

        // allow partial schema 
        if ((type == DataType.BAG || type == DataType.TUPLE)
                && inner != null) {
            schema = new ResourceSchema(inner);
        } else {
            schema = null;
        }
    }
于 2012-06-06T11:01:17.070 回答