1

我正在寻找实现一个自定义 hadoop Writable 类,其中一个字段是时间戳。我似乎无法在 hadoop 库(例如,可写日期或日历)中找到可以简化此操作的类。我正在考虑在日历上使用 get/setTimeInMillis 创建自定义可写,但我想知道是否有更好/内置的解决方案来解决这个问题。

4

2 回答 2

4

Hadoop 中没有可写的日历/日期。考虑到您可以从 Calendar 对象中将 timeInMillis 作为 long 获取,当且仅当您的应用程序始终使用默认的 UTC 时区(即,它与时区“不可知”时,它始终假定您可以使用 LongWritable 序列化日历对象) timeInMillis 表示 UTC 时间)。

如果您使用另一个时区,或者如果您的应用程序需要能够根据不同的时区解释 timeInMillis,则必须从头开始编写默认的 Writable 实现。

于 2012-08-14T18:11:25.157 回答
1

这是我为您生成的自定义可写对象,用于说明具有三个属性的可写对象,其中一个是日期。您可以看到数据值以 long 形式保存,并且很容易将 long 与 Date 相互转换。如果拥有三个属性太多,我可以为您生成一个带有日期的可写文件。

package com.lmx.writable;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import com.eaio.uuid.UUID;
import org.apache.hadoop.io.*;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultDataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;

public class MyCustomWritable implements Writable {

  public static int PROPERTY_DATE = 0;
  public static int PROPERTY_COUNT = 1;
  public static int PROPERTY_NAME = 2;

  private boolean[] changeFlag = new boolean[3];

  private Date  _date;
  private int   _count;
  private String    _name;

  public MyCustomWritable() {
    resetChangeFlags();
  }

  public MyCustomWritable(Date _date, int _count, String _name) {
    resetChangeFlags();
    setDate(_date);  
    setCount(_count);  
    setName(_name);  
  }

  public MyCustomWritable(byte[] bytes) {
    ByteArrayInputStream is = new ByteArrayInputStream(bytes);
    DataInput in = new DataInputStream(is);
    try { readFields(in); } catch (IOException e) { }
    resetChangeFlags();
  }



  public Date getDate() {
    return _date;
  }

  public void setDate(Date value) {
    _date = value;
    changeFlag[PROPERTY_DATE] = true;
  }  

  public int getCount() {
    return _count;
  }

  public void setCount(int value) {
    _count = value;
    changeFlag[PROPERTY_COUNT] = true;
  }  

  public String getName() {
    return _name;
  }

  public void setName(String value) {
    _name = value;
    changeFlag[PROPERTY_NAME] = true;
  }  

  public void readFields(DataInput in) throws IOException {

            // Read Date _date

        if (in.readBoolean()) {
            _date = new Date(in.readLong());
            changeFlag[PROPERTY_DATE] = true;
        } else {
            _date = null;
            changeFlag[PROPERTY_DATE] = false;
        }       
            // Read int _count

        _count = in.readInt();
        changeFlag[PROPERTY_COUNT] = true;

            // Read String _name

        if (in.readBoolean()) {
            _name = Text.readString(in);
            changeFlag[PROPERTY_NAME] = true;
        } else {
            _name = null;
            changeFlag[PROPERTY_NAME] = false;
        }
  }

  public void write(DataOutput out) throws IOException {

            // Write Date _date

      if (_date == null) {
            out.writeBoolean(false);
      } else {
            out.writeBoolean(true);
            out.writeLong(_date.getTime());
      }

            // Write int _count

      out.writeInt(_count);

            // Write String _name

      if (_name == null) {
            out.writeBoolean(false);
      } else {
            out.writeBoolean(true);
            Text.writeString(out,_name);
      }
  }

  public byte[] getBytes() throws IOException {
      ByteArrayOutputStream os = new ByteArrayOutputStream();
      DataOutputStream out = new DataOutputStream(os);
      write(out);
      out.flush();
      out.close();
      return os.toByteArray();
  }

  public void resetChangeFlags() {
    changeFlag[PROPERTY_DATE] = false;
    changeFlag[PROPERTY_COUNT] = false;
    changeFlag[PROPERTY_NAME] = false;
  }

  public boolean getChangeFlag(int i) {
    return changeFlag[i];
  }


   public byte[] getDateAsBytes() throws IOException {
      ByteArrayOutputStream os = new ByteArrayOutputStream();
      DataOutputStream out = new DataOutputStream(os);

            // Write Date _date

      if (_date == null) {
            out.writeBoolean(false);
      } else {
            out.writeBoolean(true);
            out.writeLong(_date.getTime());
      }

      out.flush();
      out.close();
      return os.toByteArray();
   }

   public byte[] getCountAsBytes() throws IOException {
      ByteArrayOutputStream os = new ByteArrayOutputStream();
      DataOutputStream out = new DataOutputStream(os);

            // Write int _count

      out.writeInt(_count);

      out.flush();
      out.close();
      return os.toByteArray();
   }

   public byte[] getNameAsBytes() throws IOException {
      ByteArrayOutputStream os = new ByteArrayOutputStream();
      DataOutputStream out = new DataOutputStream(os);

            // Write String _name

      if (_name == null) {
            out.writeBoolean(false);
      } else {
            out.writeBoolean(true);
            Text.writeString(out,_name);
      }

      out.flush();
      out.close();
      return os.toByteArray();
   }


   public void setDateFromBytes(byte[] b) throws IOException {
      ByteArrayInputStream is = new ByteArrayInputStream(b);
      DataInput in = new DataInputStream(is);
      int len;

            // Read Date _date

        if (in.readBoolean()) {
            _date = new Date(in.readLong());
            changeFlag[PROPERTY_DATE] = true;
        } else {
            _date = null;
            changeFlag[PROPERTY_DATE] = false;
        }
   }

   public void setCountFromBytes(byte[] b) throws IOException {
      ByteArrayInputStream is = new ByteArrayInputStream(b);
      DataInput in = new DataInputStream(is);
      int len;

            // Read int _count

        _count = in.readInt();
        changeFlag[PROPERTY_COUNT] = true;

   }

   public void setNameFromBytes(byte[] b) throws IOException {
      ByteArrayInputStream is = new ByteArrayInputStream(b);
      DataInput in = new DataInputStream(is);
      int len;

            // Read String _name

        if (in.readBoolean()) {
            _name = Text.readString(in);
            changeFlag[PROPERTY_NAME] = true;
        } else {
            _name = null;
            changeFlag[PROPERTY_NAME] = false;
        }

   }

    public Tuple asTuple() throws ExecException {

        Tuple tuple = TupleFactory.getInstance().newTuple(3);

        if (getDate() == null) {
            tuple.set(0, (Long) null);
        } else {
            tuple.set(0, new Long(getDate().getTime()));
        }
        tuple.set(1, new Integer(getCount()));
        if (getName() == null) {
            tuple.set(2, (String) null);
        } else {
            tuple.set(2, getName());
        }

        return tuple;
    }

    public static ResourceSchema getPigSchema() throws IOException {

        ResourceSchema schema = new ResourceSchema();
        ResourceFieldSchema fieldSchema[] = new ResourceFieldSchema[3];
        ResourceSchema bagSchema;
        ResourceFieldSchema bagField[];

        fieldSchema[0] = new ResourceFieldSchema();
        fieldSchema[0].setName("date");
        fieldSchema[0].setType(DataType.LONG);

        fieldSchema[1] = new ResourceFieldSchema();
        fieldSchema[1].setName("count");
        fieldSchema[1].setType(DataType.INTEGER);

        fieldSchema[2] = new ResourceFieldSchema();
        fieldSchema[2].setName("name");
        fieldSchema[2].setType(DataType.CHARARRAY);

        schema.setFields(fieldSchema);
        return schema;

    }

    public static MyCustomWritable fromJson(String source) {

        MyCustomWritable obj = null;

        try {
            JSONObject jsonObj = new JSONObject(source);
            obj = fromJson(jsonObj);
        } catch (JSONException e) {
            System.out.println(e.toString());
        }

        return obj; 
    }

    public static MyCustomWritable fromJson(JSONObject jsonObj) {

        MyCustomWritable obj = new MyCustomWritable();

        try {

            if (jsonObj.has("date")) {
                obj.setDate(new Date(jsonObj.getLong("date")));
            }

            if (jsonObj.has("count")) {
                obj.setCount(jsonObj.getInt("count"));
            }

            if (jsonObj.has("name")) {
                obj.setName(jsonObj.getString("name"));
            }

        } catch (JSONException e) {
            System.out.println(e.toString());
            obj = null;
        }

        return obj; 
    }

    public JSONObject toJson() {

        try {
            JSONObject jsonObj = new JSONObject();
            JSONArray jsonArray;

            if (getDate() != null) {
                jsonObj.put("date", getDate().getTime());
            }
            jsonObj.put("count", getCount());

            if (getName() != null) {
                jsonObj.put("name", getName());
            }
            return jsonObj; 
        } catch (JSONException e) { }

        return null;    
    }

    public String toJsonString() {

        return toJson().toString(); 

    }
}
于 2012-08-14T18:15:14.727 回答