我正在寻找实现一个自定义 hadoop Writable 类,其中一个字段是时间戳。我似乎无法在 hadoop 库(例如,可写日期或日历)中找到可以简化此操作的类。我正在考虑在日历上使用 get/setTimeInMillis 创建自定义可写,但我想知道是否有更好/内置的解决方案来解决这个问题。
问问题
3323 次
2 回答
4
Hadoop 中没有可写的日历/日期。考虑到您可以从 Calendar 对象中将 timeInMillis 作为 long 获取,当且仅当您的应用程序始终使用默认的 UTC 时区(即,它与时区“不可知”时,它始终假定您可以使用 LongWritable 序列化日历对象) timeInMillis 表示 UTC 时间)。
如果您使用另一个时区,或者如果您的应用程序需要能够根据不同的时区解释 timeInMillis,则必须从头开始编写默认的 Writable 实现。
于 2012-08-14T18:11:25.157 回答
1
这是我为您生成的自定义可写对象,用于说明具有三个属性的可写对象,其中一个是日期。您可以看到数据值以 long 形式保存,并且很容易将 long 与 Date 相互转换。如果拥有三个属性太多,我可以为您生成一个带有日期的可写文件。
package com.lmx.writable;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import com.eaio.uuid.UUID;
import org.apache.hadoop.io.*;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultDataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
public class MyCustomWritable implements Writable {
public static int PROPERTY_DATE = 0;
public static int PROPERTY_COUNT = 1;
public static int PROPERTY_NAME = 2;
private boolean[] changeFlag = new boolean[3];
private Date _date;
private int _count;
private String _name;
public MyCustomWritable() {
resetChangeFlags();
}
public MyCustomWritable(Date _date, int _count, String _name) {
resetChangeFlags();
setDate(_date);
setCount(_count);
setName(_name);
}
public MyCustomWritable(byte[] bytes) {
ByteArrayInputStream is = new ByteArrayInputStream(bytes);
DataInput in = new DataInputStream(is);
try { readFields(in); } catch (IOException e) { }
resetChangeFlags();
}
public Date getDate() {
return _date;
}
public void setDate(Date value) {
_date = value;
changeFlag[PROPERTY_DATE] = true;
}
public int getCount() {
return _count;
}
public void setCount(int value) {
_count = value;
changeFlag[PROPERTY_COUNT] = true;
}
public String getName() {
return _name;
}
public void setName(String value) {
_name = value;
changeFlag[PROPERTY_NAME] = true;
}
public void readFields(DataInput in) throws IOException {
// Read Date _date
if (in.readBoolean()) {
_date = new Date(in.readLong());
changeFlag[PROPERTY_DATE] = true;
} else {
_date = null;
changeFlag[PROPERTY_DATE] = false;
}
// Read int _count
_count = in.readInt();
changeFlag[PROPERTY_COUNT] = true;
// Read String _name
if (in.readBoolean()) {
_name = Text.readString(in);
changeFlag[PROPERTY_NAME] = true;
} else {
_name = null;
changeFlag[PROPERTY_NAME] = false;
}
}
public void write(DataOutput out) throws IOException {
// Write Date _date
if (_date == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeLong(_date.getTime());
}
// Write int _count
out.writeInt(_count);
// Write String _name
if (_name == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
Text.writeString(out,_name);
}
}
public byte[] getBytes() throws IOException {
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(os);
write(out);
out.flush();
out.close();
return os.toByteArray();
}
public void resetChangeFlags() {
changeFlag[PROPERTY_DATE] = false;
changeFlag[PROPERTY_COUNT] = false;
changeFlag[PROPERTY_NAME] = false;
}
public boolean getChangeFlag(int i) {
return changeFlag[i];
}
public byte[] getDateAsBytes() throws IOException {
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(os);
// Write Date _date
if (_date == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeLong(_date.getTime());
}
out.flush();
out.close();
return os.toByteArray();
}
public byte[] getCountAsBytes() throws IOException {
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(os);
// Write int _count
out.writeInt(_count);
out.flush();
out.close();
return os.toByteArray();
}
public byte[] getNameAsBytes() throws IOException {
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(os);
// Write String _name
if (_name == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
Text.writeString(out,_name);
}
out.flush();
out.close();
return os.toByteArray();
}
public void setDateFromBytes(byte[] b) throws IOException {
ByteArrayInputStream is = new ByteArrayInputStream(b);
DataInput in = new DataInputStream(is);
int len;
// Read Date _date
if (in.readBoolean()) {
_date = new Date(in.readLong());
changeFlag[PROPERTY_DATE] = true;
} else {
_date = null;
changeFlag[PROPERTY_DATE] = false;
}
}
public void setCountFromBytes(byte[] b) throws IOException {
ByteArrayInputStream is = new ByteArrayInputStream(b);
DataInput in = new DataInputStream(is);
int len;
// Read int _count
_count = in.readInt();
changeFlag[PROPERTY_COUNT] = true;
}
public void setNameFromBytes(byte[] b) throws IOException {
ByteArrayInputStream is = new ByteArrayInputStream(b);
DataInput in = new DataInputStream(is);
int len;
// Read String _name
if (in.readBoolean()) {
_name = Text.readString(in);
changeFlag[PROPERTY_NAME] = true;
} else {
_name = null;
changeFlag[PROPERTY_NAME] = false;
}
}
public Tuple asTuple() throws ExecException {
Tuple tuple = TupleFactory.getInstance().newTuple(3);
if (getDate() == null) {
tuple.set(0, (Long) null);
} else {
tuple.set(0, new Long(getDate().getTime()));
}
tuple.set(1, new Integer(getCount()));
if (getName() == null) {
tuple.set(2, (String) null);
} else {
tuple.set(2, getName());
}
return tuple;
}
public static ResourceSchema getPigSchema() throws IOException {
ResourceSchema schema = new ResourceSchema();
ResourceFieldSchema fieldSchema[] = new ResourceFieldSchema[3];
ResourceSchema bagSchema;
ResourceFieldSchema bagField[];
fieldSchema[0] = new ResourceFieldSchema();
fieldSchema[0].setName("date");
fieldSchema[0].setType(DataType.LONG);
fieldSchema[1] = new ResourceFieldSchema();
fieldSchema[1].setName("count");
fieldSchema[1].setType(DataType.INTEGER);
fieldSchema[2] = new ResourceFieldSchema();
fieldSchema[2].setName("name");
fieldSchema[2].setType(DataType.CHARARRAY);
schema.setFields(fieldSchema);
return schema;
}
public static MyCustomWritable fromJson(String source) {
MyCustomWritable obj = null;
try {
JSONObject jsonObj = new JSONObject(source);
obj = fromJson(jsonObj);
} catch (JSONException e) {
System.out.println(e.toString());
}
return obj;
}
public static MyCustomWritable fromJson(JSONObject jsonObj) {
MyCustomWritable obj = new MyCustomWritable();
try {
if (jsonObj.has("date")) {
obj.setDate(new Date(jsonObj.getLong("date")));
}
if (jsonObj.has("count")) {
obj.setCount(jsonObj.getInt("count"));
}
if (jsonObj.has("name")) {
obj.setName(jsonObj.getString("name"));
}
} catch (JSONException e) {
System.out.println(e.toString());
obj = null;
}
return obj;
}
public JSONObject toJson() {
try {
JSONObject jsonObj = new JSONObject();
JSONArray jsonArray;
if (getDate() != null) {
jsonObj.put("date", getDate().getTime());
}
jsonObj.put("count", getCount());
if (getName() != null) {
jsonObj.put("name", getName());
}
return jsonObj;
} catch (JSONException e) { }
return null;
}
public String toJsonString() {
return toJson().toString();
}
}
于 2012-08-14T18:15:14.727 回答