0

我有以下代码该函数将对象写入tomcat提供的replicationstream。如果对象不可序列化,那么我正在尝试编写“缺失值”。

public void writeExternal(ObjectOutput out)
      throws IOException
    {
      out.writeInt(getType());
      out.writeInt(getAction());
      out.writeUTF(getName());
      out.writeBoolean(getValue()!=null);
      try
      {
          out.writeObject(getValue());
      }
      catch (Exception e)
      {
          System.out.println("Missing Value");

      }

    }

以下代码读取对象。类似于如果对象不可序列化则读取我尝试再次读取以读取“缺失值”

   public void readExternal(ObjectInput in)
      throws IOException, ClassNotFoundException
    {
      this.type = in.readInt();
      this.action = in.readInt();
      this.name = in.readUTF();    


       boolean hasValue=in.readBoolean();

        try{
             this.value = in.readObject();
        }
        catch(Exception er)
        {
            System.out.println("Missing Value");

        }    

    }

我收到以下错误,我不太确定这是什么意思。这两个函数都被多次调用。首先为所有对象调用 writeExternal 函数,然后调用 readExternal。

java.io.StreamCorruptedException: invalid type code: 4C
at java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2480)
at java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2515)
at java.io.ObjectInputStream$BlockDataInputStream.read(ObjectInputStream.java:2587)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2792)
at java.io.ObjectInputStream.readInt(ObjectInputStream.java:967)

编辑 *这是代码*

public class DeltaRequest
  implements Externalizable
{
  private LinkedList actions = new LinkedList();
  private LinkedList actionPool = new LinkedList();

  public void readExternal(ObjectInput in)
    throws IOException, ClassNotFoundException
  {   
      AttributeInfo info = null;
      if (this.actionPool.size() > 0) {

          info = (AttributeInfo)this.actionPool.removeFirst();      

          info.readExternal(in);
          this.actions.addLast(info);
    }
  }

  public void writeExternal(ObjectOutput out)
    throws IOException
  {

    out.writeUTF(getSessionId());
    out.writeBoolean(this.recordAllActions);
    out.writeInt(getSize());
    for (int i = 0; i < getSize(); i++) {
      AttributeInfo info = (AttributeInfo)this.actions.get(i);
      info.writeExternal(out);


    }
  }

  protected byte[] serialize()
    throws IOException
  {

    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    ObjectOutputStream oos = new ObjectOutputStream(bos);
    writeExternal(oos);
    oos.flush();
    oos.close();
    return bos.toByteArray();
  }

  private static class AttributeInfo implements Externalizable {
    private String name = null;
    private Object value = null;
    private int action;
    private int type;

    public AttributeInfo() {
    }

    public AttributeInfo(int type, int action, String name, Object value) {
      init(type, action, name, value);
    }

    public void init(int type, int action, String name, Object value)
    {
      this.name = name;
      this.value = value;
      this.action = action;
      this.type = type;
    }

    public int getType() {
      return this.type;
    }

    public int getAction() {
      return this.action;
    }

    public Object getValue() {
      return this.value;
    }
    public int hashCode() {
      return this.name.hashCode();
    }

    public String getName() {
      return this.name;
    }

    public void recycle() {
      this.name = null;
      this.value = null;
      this.type = -1;
      this.action = -1;
    }

    public boolean equals(Object o) {
      if (!(o instanceof AttributeInfo)) return false;
      AttributeInfo other = (AttributeInfo)o;
      return other.getName().equals(getName());
    }

    public void readExternal(ObjectInput in)
      throws IOException, ClassNotFoundException
    {
      this.type = in.readInt();
      this.action = in.readInt();
      this.name = in.readUTF();

      boolean hasValue=in.readBoolean();

      try{
         this.value = in.readObject();
      }
      catch(Exception er)
      {
         out.writeObject("Value Missing");
      }


    }

    public void writeExternal(ObjectOutput out)
      throws IOException
    {
      out.writeInt(getType());
      out.writeInt(getAction());
      out.writeUTF(getName());
      out.writeBoolean(getValue()!=null);
      try
      {
          out.writeObject(getValue());
      }
      catch (Exception e)
      {
          out.writeObject("Value Missing");
      }
    }

    public String toString()
    {
      StringBuffer buf = new StringBuffer("AttributeInfo[type=");
      buf.append(getType()).append(", action=").append(getAction());
      buf.append(", name=").append(getName()).append(", value=").append(getValue());
      buf.append(", addr=").append(super.toString()).append("]");
      return buf.toString();
    }
  }
}

代码:这是调用上述函数的方式。

protected DeltaRequest deserializeDeltaRequest(DeltaSession objbb, byte[] data)
    throws ClassNotFoundException, IOException
  {
    try
    {
      objbb.lock();
      ReplicationStream ois = getReplicationStream(data);
      objbb.getDeltaRequest().readExternal(ois);
      ois.close();
      return objbb.getDeltaRequest();
    } finally {
      objbb.unlock();
    }
  }

  protected byte[] serializeDeltaRequest(DeltaSession objbb, DeltaRequest objAA)
    throws IOException
  {
    try
    {
      objbb.lock();
      return objAA.serialize();
    } finally {
      objbb.unlock();
    }
  }

增量管理器

public class DeltaManager extends ClusterManagerBase
{




  public Session createSession(String sessionId)
  {
    return createSession(sessionId, true);
  }

  public Session createSession(String sessionId, boolean distribute)
  {
    if ((this.maxActiveSessions >= 0) && (this.sessions.size() >= this.maxActiveSessions)) {
      this.rejectedSessions += 1;
      throw new IllegalStateException(sm.getString("deltaManager.createSession.ise"));
    }
    DeltaSession session = (DeltaSession)super.createSession(sessionId);
    if (distribute) {
      sendCreateSession(session.getId(), session);
    }
    if (log.isDebugEnabled())
      log.debug(sm.getString("deltaManager.createSession.newSession", session.getId(), new Integer(this.sessions.size())));
    return session;
  }

  protected void sendCreateSession(String sessionId, DeltaSession session)
  {
    if (this.cluster.getMembers().length > 0) {
      SessionMessage msg = new SessionMessageImpl(getName(), 1, null, sessionId, sessionId + "-" + System.currentTimeMillis());

      if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.sendMessage.newSession", this.name, sessionId));
      msg.setTimestamp(session.getCreationTime());
      this.counterSend_EVT_SESSION_CREATED += 1L;
      send(msg);
    }
  }


  protected DeltaRequest deserializeDeltaRequest(DeltaSession session, byte[] data)
    throws ClassNotFoundException, IOException
  {
    try
    {
      session.lock();
      ReplicationStream ois = getReplicationStream(data);
      session.getDeltaRequest().readExternal(ois);
      ois.close();
      return session.getDeltaRequest();
    } finally {
      session.unlock();
    }
  }

  protected byte[] serializeDeltaRequest(DeltaSession session, DeltaRequest deltaRequest)
    throws IOException
  {
    try
    {
      session.lock();
      return deltaRequest.serialize();
    } finally {
      session.unlock();
    }
  }

  protected void deserializeSessions(byte[] data)
    throws ClassNotFoundException, IOException
  {
    ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();
    ObjectInputStream ois = null;
    try
    {
      ois = getReplicationStream(data);
      Integer count = (Integer)ois.readObject();
      int n = count.intValue();
      for (int i = 0; i < n; i++) {
        DeltaSession session = (DeltaSession)createEmptySession();
        session.readObjectData(ois);
        session.setManager(this);
        session.setValid(true);
        session.setPrimarySession(false);

        session.access();

        session.setAccessCount(0);
        session.resetDeltaRequest();

        if (findSession(session.getIdInternal()) == null) {
          this.sessionCounter += 1;
        } else {
          this.sessionReplaceCounter += 1L;

          if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.loading.existing.session", session.getIdInternal()));
        }
        add(session);
      }
    } catch (ClassNotFoundException e) {
      log.error(sm.getString("deltaManager.loading.cnfe", e), e);
      throw e;
    } catch (IOException e) {
      log.error(sm.getString("deltaManager.loading.ioe", e), e);
      throw e;
    }
    finally {
      try {
        if (ois != null) ois.close(); 
      }
      catch (IOException f)
      {
      }
      ois = null;
      if (originalLoader != null) Thread.currentThread().setContextClassLoader(originalLoader);
    }
  }

  protected byte[] serializeSessions(Session[] currentSessions)
    throws IOException
  {
    ByteArrayOutputStream fos = null;
    ObjectOutputStream oos = null;
    try
    {
      fos = new ByteArrayOutputStream();
      oos = new ObjectOutputStream(new BufferedOutputStream(fos));
      oos.writeObject(new Integer(currentSessions.length));
      for (int i = 0; i < currentSessions.length; i++) {
        ((DeltaSession)currentSessions[i]).writeObjectData(oos);
      }

      oos.flush();
    } catch (IOException e) {
      log.error(sm.getString("deltaManager.unloading.ioe", e), e);
      throw e;
    } finally {
      if (oos != null) {
        try {
          oos.close();
        }
        catch (IOException f) {
        }
        oos = null;
      }
    }

    return fos.toByteArray();
  }

  public void start()
    throws LifecycleException
  {
    if (!this.initialized) init();

    if (this.started) {
      return;
    }
    this.started = true;
    this.lifecycle.fireLifecycleEvent("start", null);

    generateSessionId();
    try
    {
      Cluster cluster = getCluster();

      if (cluster == null) {
        Container context = getContainer();
        if ((context != null) && ((context instanceof Context))) {
          Container host = context.getParent();
          if ((host != null) && ((host instanceof Host))) {
            cluster = host.getCluster();
            if ((cluster != null) && ((cluster instanceof CatalinaCluster))) {
              setCluster((CatalinaCluster)cluster);
            } else {
              Container engine = host.getParent();
              if ((engine != null) && ((engine instanceof Engine))) {
                cluster = engine.getCluster();
                if ((cluster != null) && ((cluster instanceof CatalinaCluster)))
                  setCluster((CatalinaCluster)cluster);
              }
              else {
                cluster = null;
              }
            }
          }
        }
      }
      if (cluster == null) {
        log.error(sm.getString("deltaManager.noCluster", getName()));
        return;
      }
      if (log.isInfoEnabled()) {
        String type = "unknown";
        if ((cluster.getContainer() instanceof Host))
          type = "Host";
        else if ((cluster.getContainer() instanceof Engine)) {
          type = "Engine";
        }
        log.info(sm.getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName()));
      }

      if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.startClustering", getName()));

      cluster.registerManager(this);

      getAllClusterSessions();
    }
    catch (Throwable t) {
      log.error(sm.getString("deltaManager.managerLoad"), t);
    }
  }

  public synchronized void getAllClusterSessions()
  {
    if ((this.cluster != null) && (this.cluster.getMembers().length > 0)) {
      long beforeSendTime = System.currentTimeMillis();
      Member mbr = findSessionMasterMember();
      if (mbr == null) {
        return;
      }
      SessionMessage msg = new SessionMessageImpl(getName(), 4, null, "GET-ALL", "GET-ALL-" + getName());

      this.stateTransferCreateSendTime = beforeSendTime;

      this.counterSend_EVT_GET_ALL_SESSIONS += 1L;
      this.stateTransfered = false;
      try
      {
        synchronized (this.receivedMessageQueue) {
          this.receiverQueue = true;
        }
        this.cluster.send(msg, mbr);
        if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.waitForSessionState", getName(), mbr, Integer.valueOf(getStateTransferTimeout())));

        waitForSendAllSessions(beforeSendTime);
      } finally {
        synchronized (this.receivedMessageQueue) {
          for (Iterator iter = this.receivedMessageQueue.iterator(); iter.hasNext(); ) {
            SessionMessage smsg = (SessionMessage)iter.next();
            if (!this.stateTimestampDrop) {
              messageReceived(smsg, smsg.getAddress() != null ? smsg.getAddress() : null);
            }
            else if ((smsg.getEventType() != 4) && (smsg.getTimestamp() >= this.stateTransferCreateSendTime))
            {
              messageReceived(smsg, smsg.getAddress() != null ? smsg.getAddress() : null);
            }
            else if (log.isWarnEnabled()) {
              log.warn(sm.getString("deltaManager.dropMessage", getName(), smsg.getEventTypeString(), new Date(this.stateTransferCreateSendTime), new Date(smsg.getTimestamp())));
            }

          }

          this.receivedMessageQueue.clear();
          this.receiverQueue = false;
        }
      }
    }
    else if (log.isInfoEnabled()) { log.info(sm.getString("deltaManager.noMembers", getName())); }

  }

  protected void registerSessionAtReplicationValve(DeltaSession session)
  {
    if ((this.replicationValve == null) && 
      ((this.container instanceof StandardContext)) && (((StandardContext)this.container).getCrossContext())) {
      Cluster cluster = getCluster();
      if ((cluster != null) && ((cluster instanceof CatalinaCluster))) {
        Valve[] valves = ((CatalinaCluster)cluster).getValves();
        if ((valves != null) && (valves.length > 0)) {
          for (int i = 0; (this.replicationValve == null) && (i < valves.length); i++) {
            if ((valves[i] instanceof ReplicationValve)) this.replicationValve = ((ReplicationValve)valves[i]);
          }

          if ((this.replicationValve == null) && (log.isDebugEnabled())) {
            log.debug("no ReplicationValve found for CrossContext Support");
          }
        }
      }
    }

    if (this.replicationValve != null)
      this.replicationValve.registerReplicationSession(session);
  }

  protected Member findSessionMasterMember()
  {
    Member mbr = null;
    Member[] mbrs = this.cluster.getMembers();
    if (mbrs.length != 0) mbr = mbrs[0];
    if ((mbr == null) && (log.isWarnEnabled())) log.warn(sm.getString("deltaManager.noMasterMember", getName(), ""));
    if ((mbr != null) && (log.isDebugEnabled())) log.warn(sm.getString("deltaManager.foundMasterMember", getName(), mbr));
    return mbr;
  }



  public void messageDataReceived(ClusterMessage cmsg)
  {
    if ((cmsg != null) && ((cmsg instanceof SessionMessage))) {
      SessionMessage msg = (SessionMessage)cmsg;
      switch (msg.getEventType()) {
      case 1:
      case 2:
      case 3:
      case 4:
      case 13:
        synchronized (this.receivedMessageQueue) {
          if (this.receiverQueue) {
            this.receivedMessageQueue.add(msg);
            return;
          }
        }
        break;
      case 5:
      case 6:
      case 7:
      case 8:
      case 9:
      case 10:
      case 11:
      case 12: } messageReceived(msg, msg.getAddress() != null ? msg.getAddress() : null);
    }
  }

  public ClusterMessage requestCompleted(String sessionId)
  {
    return requestCompleted(sessionId, false);
  }

  public ClusterMessage requestCompleted(String sessionId, boolean expires)
  {
    DeltaSession session = null;
    try {
      session = (DeltaSession)findSession(sessionId);
      DeltaRequest deltaRequest = session.getDeltaRequest();
      session.lock();
      msg = null;
      boolean isDeltaRequest = false;
      synchronized (deltaRequest) {
        isDeltaRequest = deltaRequest.getSize() > 0;
        if (isDeltaRequest) {
          this.counterSend_EVT_SESSION_DELTA += 1L;
          byte[] data = serializeDeltaRequest(session, deltaRequest);
          msg = new SessionMessageImpl(getName(), 13, data, sessionId, sessionId + "-" + System.currentTimeMillis());

          session.resetDeltaRequest();
        }
      }
      if (!isDeltaRequest) {
        if ((!expires) && (!session.isPrimarySession())) {
          this.counterSend_EVT_SESSION_ACCESSED += 1L;
          msg = new SessionMessageImpl(getName(), 3, null, sessionId, sessionId + "-" + System.currentTimeMillis());

          if (log.isDebugEnabled()) {
            log.debug(sm.getString("deltaManager.createMessage.accessChangePrimary", getName(), sessionId));
          }
        }
      }
      else if (log.isDebugEnabled()) {
        log.debug(sm.getString("deltaManager.createMessage.delta", getName(), sessionId));
      }

      if (!expires)
        session.setPrimarySession(true);
      long replDelta;
      if ((!expires) && (msg == null)) {
        replDelta = System.currentTimeMillis() - session.getLastTimeReplicated();
        if (replDelta > getMaxInactiveInterval() * 1000) {
          this.counterSend_EVT_SESSION_ACCESSED += 1L;
          msg = new SessionMessageImpl(getName(), 3, null, sessionId, sessionId + "-" + System.currentTimeMillis());

          if (log.isDebugEnabled()) {
            log.debug(sm.getString("deltaManager.createMessage.access", getName(), sessionId));
          }

        }

      }

      if (msg != null) {
        session.setLastTimeReplicated(System.currentTimeMillis());
        msg.setTimestamp(session.getLastTimeReplicated());
      }
      return msg;
    }
    catch (IOException x)
    {
      SessionMessage msg;
      log.error(sm.getString("deltaManager.createMessage.unableCreateDeltaRequest", sessionId), x);
      return null;
    } finally {
      if (session != null) session.unlock();
    }
  }



  protected void messageReceived(SessionMessage msg, Member sender)
  {
    if ((doDomainReplication()) && (!checkSenderDomain(msg, sender))) {
      return;
    }
    ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
    try
    {
      ClassLoader[] loaders = getClassLoaders();
      if ((loaders != null) && (loaders.length > 0)) Thread.currentThread().setContextClassLoader(loaders[0]);
      if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.eventType", getName(), msg.getEventTypeString(), sender));

      switch (msg.getEventType()) {
      case 4:
        handleGET_ALL_SESSIONS(msg, sender);
        break;
      case 12:
        handleALL_SESSION_DATA(msg, sender);
        break;
      case 14:
        handleALL_SESSION_TRANSFERCOMPLETE(msg, sender);
        break;
      case 1:
        handleSESSION_CREATED(msg, sender);
        break;
      case 2:
        handleSESSION_EXPIRED(msg, sender);
        break;
      case 3:
        handleSESSION_ACCESSED(msg, sender);
        break;
      case 13:
        handleSESSION_DELTA(msg, sender);
      case 5:
      case 6:
      case 7:
      case 8:
      case 9:
      case 10:
      case 11:
      }
    } catch (Exception x) { log.error(sm.getString("deltaManager.receiveMessage.error", getName()), x);
    } finally {
      Thread.currentThread().setContextClassLoader(contextLoader);
    }
  }

  protected void handleALL_SESSION_TRANSFERCOMPLETE(SessionMessage msg, Member sender)
  {
    this.counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE += 1;
    if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.transfercomplete", getName(), sender.getHost(), new Integer(sender.getPort())));
    this.stateTransferCreateSendTime = msg.getTimestamp();
    this.stateTransfered = true;
  }

  protected void handleSESSION_DELTA(SessionMessage msg, Member sender)
    throws IOException, ClassNotFoundException
  {
    this.counterReceive_EVT_SESSION_DELTA += 1L;
    byte[] delta = msg.getSession();
    DeltaSession session = (DeltaSession)findSession(msg.getSessionID());
    if (session != null) {
      if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.delta", getName(), msg.getSessionID())); try
      {
        session.lock();
        DeltaRequest dreq = deserializeDeltaRequest(session, delta);
        dreq.execute(session, this.notifyListenersOnReplication);
        session.setPrimarySession(false);
      } finally {
        session.unlock();
      }
    }
  }



  protected void handleSESSION_CREATED(SessionMessage msg, Member sender)
  {
    this.counterReceive_EVT_SESSION_CREATED += 1L;
    if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.createNewSession", getName(), msg.getSessionID()));
    DeltaSession session = (DeltaSession)createEmptySession();
    session.setManager(this);
    session.setValid(true);
    session.setPrimarySession(false);
    session.setCreationTime(msg.getTimestamp());

    session.setMaxInactiveInterval(getMaxInactiveInterval());
    session.access();
    if (this.notifySessionListenersOnReplication) {
      session.setId(msg.getSessionID());
    } else {
      session.setIdInternal(msg.getSessionID());
      add(session);
    }
    session.resetDeltaRequest();
    session.endAccess();
  }

  protected void handleALL_SESSION_DATA(SessionMessage msg, Member sender)
    throws ClassNotFoundException, IOException
  {
    this.counterReceive_EVT_ALL_SESSION_DATA += 1L;
    if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataBegin", getName()));
    byte[] data = msg.getSession();
    deserializeSessions(data);
    if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataAfter", getName()));
  }

  protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender)
    throws IOException
  {
    this.counterReceive_EVT_GET_ALL_SESSIONS += 1L;

    if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName()));

    Session[] currentSessions = findSessions();
    long findSessionTimestamp = System.currentTimeMillis();
    if (isSendAllSessions()) {
      sendSessions(sender, currentSessions, findSessionTimestamp);
    }
    else {
      int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length : getSendAllSessionsSize();
      Session[] sendSessions = new Session[len];
      for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) {
        len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i : getSendAllSessionsSize();
        System.arraycopy(currentSessions, i, sendSessions, 0, len);
        sendSessions(sender, sendSessions, findSessionTimestamp);
        if (getSendAllSessionsWaitTime() > 0)
          try {
            Thread.sleep(getSendAllSessionsWaitTime());
          }
          catch (Exception sleep)
          {
          }
      }
    }
    SessionMessage newmsg = new SessionMessageImpl(this.name, 14, null, "SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED" + getName());
    newmsg.setTimestamp(findSessionTimestamp);
    if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionTransfered", getName()));
    this.counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE += 1;
    this.cluster.send(newmsg, sender);
  }

  protected void sendSessions(Member sender, Session[] currentSessions, long sendTimestamp)
    throws IOException
  {
    byte[] data = serializeSessions(currentSessions);
    if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingAfter", getName()));
    SessionMessage newmsg = new SessionMessageImpl(this.name, 12, data, "SESSION-STATE", "SESSION-STATE-" + getName());
    newmsg.setTimestamp(sendTimestamp);
    if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionData", getName()));
    this.counterSend_EVT_ALL_SESSION_DATA += 1L;
    this.cluster.send(newmsg, sender);
  }


}
4

1 回答 1

0

这里的问题是你在打电话

this.writeExternal(oos);

代替

oos.writeObject(this);

所以ObjectOutputStream永远没有机会写对象序言。

同样,您必须调用

Object o = ois.readObject();

而不是

Object o = this.readExternal(ois);

对于您的不可序列化对象,您应该编写一个特殊的对象。目前,您将所有可能的异常都视为丢失的对象,而它可能是大量其他事物。

于 2013-07-03T05:42:54.123 回答