2012-08-14 151 views
1

我正在寻找实现一个自定义hadoop可写类,其中一个字段是时间戳。我似乎无法在hadoop库中找到一个类(例如Writable for Date或Calendar),这会使这一点变得简单。我正在考虑在日历上使用get/setTimeInMillis创建自定义可写,但我想知道是否有更好的/内置的解决方案来解决这个问题。Hadoop可写日期/日历

回答

3

Hadoop中没有可编写的日历/日期。考虑到您可以从Calendar对象中获取timeInMillis,您可以使用LongWritable对一个日历对象进行序列化,当且仅当您的应用程序始终使用默认的UTC时区(即它对时区“不可知”)时,它总是假定timeInMillis表示UTC时间)。

如果您使用其他时区或者您的应用程序需要能够解释timeInMillis相对于各个时区,则必须从头开始编写默认的Writable实现。

+0

感谢您的确认! – ChaseMedallion 2012-08-14 20:51:50

1

下面是我为您生成的一个自定义写入,以说明具有三个属性(其中一个是日期)的可写入项。您可以看到数据值持续很长时间,并且很容易将长转换为日期和从日期转换。如果有三个属性太多,我可以为你生成一个带日期的可写。

package com.lmx.writable; 

import java.io.ByteArrayInputStream; 
import java.io.ByteArrayOutputStream; 
import java.io.DataInput; 
import java.io.DataInputStream; 
import java.io.DataOutput; 
import java.io.DataOutputStream; 
import java.io.IOException; 
import java.nio.ByteBuffer; 
import java.util.*; 
import com.eaio.uuid.UUID; 
import org.apache.hadoop.io.*; 
import org.apache.pig.ResourceSchema; 
import org.apache.pig.ResourceSchema.ResourceFieldSchema; 
import org.apache.pig.backend.executionengine.ExecException; 
import org.apache.pig.data.DataBag; 
import org.apache.pig.data.DataType; 
import org.apache.pig.data.DefaultDataBag; 
import org.apache.pig.data.Tuple; 
import org.apache.pig.data.TupleFactory; 
import org.json.JSONArray; 
import org.json.JSONException; 
import org.json.JSONObject; 

public class MyCustomWritable implements Writable { 

    public static int PROPERTY_DATE = 0; 
    public static int PROPERTY_COUNT = 1; 
    public static int PROPERTY_NAME = 2; 

    private boolean[] changeFlag = new boolean[3]; 

    private Date _date; 
    private int _count; 
    private String _name; 

    public MyCustomWritable() { 
    resetChangeFlags(); 
    } 

    public MyCustomWritable(Date _date, int _count, String _name) { 
    resetChangeFlags(); 
    setDate(_date); 
    setCount(_count); 
    setName(_name); 
    } 

    public MyCustomWritable(byte[] bytes) { 
    ByteArrayInputStream is = new ByteArrayInputStream(bytes); 
    DataInput in = new DataInputStream(is); 
    try { readFields(in); } catch (IOException e) { } 
    resetChangeFlags(); 
    } 



    public Date getDate() { 
    return _date; 
    } 

    public void setDate(Date value) { 
    _date = value; 
    changeFlag[PROPERTY_DATE] = true; 
    } 

    public int getCount() { 
    return _count; 
    } 

    public void setCount(int value) { 
    _count = value; 
    changeFlag[PROPERTY_COUNT] = true; 
    } 

    public String getName() { 
    return _name; 
    } 

    public void setName(String value) { 
    _name = value; 
    changeFlag[PROPERTY_NAME] = true; 
    } 

    public void readFields(DataInput in) throws IOException { 

      // Read Date _date 

     if (in.readBoolean()) { 
      _date = new Date(in.readLong()); 
      changeFlag[PROPERTY_DATE] = true; 
     } else { 
      _date = null; 
      changeFlag[PROPERTY_DATE] = false; 
     }  
      // Read int _count 

     _count = in.readInt(); 
     changeFlag[PROPERTY_COUNT] = true; 

      // Read String _name 

     if (in.readBoolean()) { 
      _name = Text.readString(in); 
      changeFlag[PROPERTY_NAME] = true; 
     } else { 
      _name = null; 
      changeFlag[PROPERTY_NAME] = false; 
     } 
    } 

    public void write(DataOutput out) throws IOException { 

      // Write Date _date 

     if (_date == null) { 
      out.writeBoolean(false); 
     } else { 
      out.writeBoolean(true); 
      out.writeLong(_date.getTime()); 
     } 

      // Write int _count 

     out.writeInt(_count); 

      // Write String _name 

     if (_name == null) { 
      out.writeBoolean(false); 
     } else { 
      out.writeBoolean(true); 
      Text.writeString(out,_name); 
     } 
    } 

    public byte[] getBytes() throws IOException { 
     ByteArrayOutputStream os = new ByteArrayOutputStream(); 
     DataOutputStream out = new DataOutputStream(os); 
     write(out); 
     out.flush(); 
     out.close(); 
     return os.toByteArray(); 
    } 

    public void resetChangeFlags() { 
    changeFlag[PROPERTY_DATE] = false; 
    changeFlag[PROPERTY_COUNT] = false; 
    changeFlag[PROPERTY_NAME] = false; 
    } 

    public boolean getChangeFlag(int i) { 
    return changeFlag[i]; 
    } 


    public byte[] getDateAsBytes() throws IOException { 
     ByteArrayOutputStream os = new ByteArrayOutputStream(); 
     DataOutputStream out = new DataOutputStream(os); 

      // Write Date _date 

     if (_date == null) { 
      out.writeBoolean(false); 
     } else { 
      out.writeBoolean(true); 
      out.writeLong(_date.getTime()); 
     } 

     out.flush(); 
     out.close(); 
     return os.toByteArray(); 
    } 

    public byte[] getCountAsBytes() throws IOException { 
     ByteArrayOutputStream os = new ByteArrayOutputStream(); 
     DataOutputStream out = new DataOutputStream(os); 

      // Write int _count 

     out.writeInt(_count); 

     out.flush(); 
     out.close(); 
     return os.toByteArray(); 
    } 

    public byte[] getNameAsBytes() throws IOException { 
     ByteArrayOutputStream os = new ByteArrayOutputStream(); 
     DataOutputStream out = new DataOutputStream(os); 

      // Write String _name 

     if (_name == null) { 
      out.writeBoolean(false); 
     } else { 
      out.writeBoolean(true); 
      Text.writeString(out,_name); 
     } 

     out.flush(); 
     out.close(); 
     return os.toByteArray(); 
    } 


    public void setDateFromBytes(byte[] b) throws IOException { 
     ByteArrayInputStream is = new ByteArrayInputStream(b); 
     DataInput in = new DataInputStream(is); 
     int len; 

      // Read Date _date 

     if (in.readBoolean()) { 
      _date = new Date(in.readLong()); 
      changeFlag[PROPERTY_DATE] = true; 
     } else { 
      _date = null; 
      changeFlag[PROPERTY_DATE] = false; 
     } 
    } 

    public void setCountFromBytes(byte[] b) throws IOException { 
     ByteArrayInputStream is = new ByteArrayInputStream(b); 
     DataInput in = new DataInputStream(is); 
     int len; 

      // Read int _count 

     _count = in.readInt(); 
     changeFlag[PROPERTY_COUNT] = true; 

    } 

    public void setNameFromBytes(byte[] b) throws IOException { 
     ByteArrayInputStream is = new ByteArrayInputStream(b); 
     DataInput in = new DataInputStream(is); 
     int len; 

      // Read String _name 

     if (in.readBoolean()) { 
      _name = Text.readString(in); 
      changeFlag[PROPERTY_NAME] = true; 
     } else { 
      _name = null; 
      changeFlag[PROPERTY_NAME] = false; 
     } 

    } 

    public Tuple asTuple() throws ExecException { 

     Tuple tuple = TupleFactory.getInstance().newTuple(3); 

     if (getDate() == null) { 
      tuple.set(0, (Long) null); 
     } else { 
      tuple.set(0, new Long(getDate().getTime())); 
     } 
     tuple.set(1, new Integer(getCount())); 
     if (getName() == null) { 
      tuple.set(2, (String) null); 
     } else { 
      tuple.set(2, getName()); 
     } 

     return tuple; 
    } 

    public static ResourceSchema getPigSchema() throws IOException { 

     ResourceSchema schema = new ResourceSchema(); 
     ResourceFieldSchema fieldSchema[] = new ResourceFieldSchema[3]; 
     ResourceSchema bagSchema; 
     ResourceFieldSchema bagField[]; 

     fieldSchema[0] = new ResourceFieldSchema(); 
     fieldSchema[0].setName("date"); 
     fieldSchema[0].setType(DataType.LONG); 

     fieldSchema[1] = new ResourceFieldSchema(); 
     fieldSchema[1].setName("count"); 
     fieldSchema[1].setType(DataType.INTEGER); 

     fieldSchema[2] = new ResourceFieldSchema(); 
     fieldSchema[2].setName("name"); 
     fieldSchema[2].setType(DataType.CHARARRAY); 

     schema.setFields(fieldSchema); 
     return schema; 

    } 

    public static MyCustomWritable fromJson(String source) { 

     MyCustomWritable obj = null; 

     try { 
      JSONObject jsonObj = new JSONObject(source); 
      obj = fromJson(jsonObj); 
     } catch (JSONException e) { 
      System.out.println(e.toString()); 
     } 

     return obj; 
    } 

    public static MyCustomWritable fromJson(JSONObject jsonObj) { 

     MyCustomWritable obj = new MyCustomWritable(); 

     try { 

      if (jsonObj.has("date")) { 
       obj.setDate(new Date(jsonObj.getLong("date"))); 
      } 

      if (jsonObj.has("count")) { 
       obj.setCount(jsonObj.getInt("count")); 
      } 

      if (jsonObj.has("name")) { 
       obj.setName(jsonObj.getString("name")); 
      } 

     } catch (JSONException e) { 
      System.out.println(e.toString()); 
      obj = null; 
     } 

     return obj; 
    } 

    public JSONObject toJson() { 

     try { 
      JSONObject jsonObj = new JSONObject(); 
      JSONArray jsonArray; 

      if (getDate() != null) { 
       jsonObj.put("date", getDate().getTime()); 
      } 
      jsonObj.put("count", getCount()); 

      if (getName() != null) { 
       jsonObj.put("name", getName()); 
      } 
      return jsonObj; 
     } catch (JSONException e) { } 

     return null;  
    } 

    public String toJsonString() { 

     return toJson().toString(); 

    } 
}