我有以下代码该函数将对象写入tomcat提供的replicationstream。如果对象不可序列化,那么我正在尝试编写“缺失值”。
public void writeExternal(ObjectOutput out)
throws IOException
{
out.writeInt(getType());
out.writeInt(getAction());
out.writeUTF(getName());
out.writeBoolean(getValue()!=null);
try
{
out.writeObject(getValue());
}
catch (Exception e)
{
System.out.println("Missing Value");
}
}
以下代码读取对象。类似于如果对象不可序列化则读取我尝试再次读取以读取“缺失值”
public void readExternal(ObjectInput in)
throws IOException, ClassNotFoundException
{
this.type = in.readInt();
this.action = in.readInt();
this.name = in.readUTF();
boolean hasValue=in.readBoolean();
try{
this.value = in.readObject();
}
catch(Exception er)
{
System.out.println("Missing Value");
}
}
我收到以下错误,我不太确定这是什么意思。这两个函数都被多次调用。首先为所有对象调用 writeExternal 函数,然后调用 readExternal。
java.io.StreamCorruptedException: invalid type code: 4C
at java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2480)
at java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2515)
at java.io.ObjectInputStream$BlockDataInputStream.read(ObjectInputStream.java:2587)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2792)
at java.io.ObjectInputStream.readInt(ObjectInputStream.java:967)
编辑 *这是代码*
public class DeltaRequest
implements Externalizable
{
private LinkedList actions = new LinkedList();
private LinkedList actionPool = new LinkedList();
public void readExternal(ObjectInput in)
throws IOException, ClassNotFoundException
{
AttributeInfo info = null;
if (this.actionPool.size() > 0) {
info = (AttributeInfo)this.actionPool.removeFirst();
info.readExternal(in);
this.actions.addLast(info);
}
}
public void writeExternal(ObjectOutput out)
throws IOException
{
out.writeUTF(getSessionId());
out.writeBoolean(this.recordAllActions);
out.writeInt(getSize());
for (int i = 0; i < getSize(); i++) {
AttributeInfo info = (AttributeInfo)this.actions.get(i);
info.writeExternal(out);
}
}
protected byte[] serialize()
throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
writeExternal(oos);
oos.flush();
oos.close();
return bos.toByteArray();
}
private static class AttributeInfo implements Externalizable {
private String name = null;
private Object value = null;
private int action;
private int type;
public AttributeInfo() {
}
public AttributeInfo(int type, int action, String name, Object value) {
init(type, action, name, value);
}
public void init(int type, int action, String name, Object value)
{
this.name = name;
this.value = value;
this.action = action;
this.type = type;
}
public int getType() {
return this.type;
}
public int getAction() {
return this.action;
}
public Object getValue() {
return this.value;
}
public int hashCode() {
return this.name.hashCode();
}
public String getName() {
return this.name;
}
public void recycle() {
this.name = null;
this.value = null;
this.type = -1;
this.action = -1;
}
public boolean equals(Object o) {
if (!(o instanceof AttributeInfo)) return false;
AttributeInfo other = (AttributeInfo)o;
return other.getName().equals(getName());
}
public void readExternal(ObjectInput in)
throws IOException, ClassNotFoundException
{
this.type = in.readInt();
this.action = in.readInt();
this.name = in.readUTF();
boolean hasValue=in.readBoolean();
try{
this.value = in.readObject();
}
catch(Exception er)
{
out.writeObject("Value Missing");
}
}
public void writeExternal(ObjectOutput out)
throws IOException
{
out.writeInt(getType());
out.writeInt(getAction());
out.writeUTF(getName());
out.writeBoolean(getValue()!=null);
try
{
out.writeObject(getValue());
}
catch (Exception e)
{
out.writeObject("Value Missing");
}
}
public String toString()
{
StringBuffer buf = new StringBuffer("AttributeInfo[type=");
buf.append(getType()).append(", action=").append(getAction());
buf.append(", name=").append(getName()).append(", value=").append(getValue());
buf.append(", addr=").append(super.toString()).append("]");
return buf.toString();
}
}
}
代码:这是调用上述函数的方式。
protected DeltaRequest deserializeDeltaRequest(DeltaSession objbb, byte[] data)
throws ClassNotFoundException, IOException
{
try
{
objbb.lock();
ReplicationStream ois = getReplicationStream(data);
objbb.getDeltaRequest().readExternal(ois);
ois.close();
return objbb.getDeltaRequest();
} finally {
objbb.unlock();
}
}
protected byte[] serializeDeltaRequest(DeltaSession objbb, DeltaRequest objAA)
throws IOException
{
try
{
objbb.lock();
return objAA.serialize();
} finally {
objbb.unlock();
}
}
增量管理器
public class DeltaManager extends ClusterManagerBase
{
public Session createSession(String sessionId)
{
return createSession(sessionId, true);
}
public Session createSession(String sessionId, boolean distribute)
{
if ((this.maxActiveSessions >= 0) && (this.sessions.size() >= this.maxActiveSessions)) {
this.rejectedSessions += 1;
throw new IllegalStateException(sm.getString("deltaManager.createSession.ise"));
}
DeltaSession session = (DeltaSession)super.createSession(sessionId);
if (distribute) {
sendCreateSession(session.getId(), session);
}
if (log.isDebugEnabled())
log.debug(sm.getString("deltaManager.createSession.newSession", session.getId(), new Integer(this.sessions.size())));
return session;
}
protected void sendCreateSession(String sessionId, DeltaSession session)
{
if (this.cluster.getMembers().length > 0) {
SessionMessage msg = new SessionMessageImpl(getName(), 1, null, sessionId, sessionId + "-" + System.currentTimeMillis());
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.sendMessage.newSession", this.name, sessionId));
msg.setTimestamp(session.getCreationTime());
this.counterSend_EVT_SESSION_CREATED += 1L;
send(msg);
}
}
protected DeltaRequest deserializeDeltaRequest(DeltaSession session, byte[] data)
throws ClassNotFoundException, IOException
{
try
{
session.lock();
ReplicationStream ois = getReplicationStream(data);
session.getDeltaRequest().readExternal(ois);
ois.close();
return session.getDeltaRequest();
} finally {
session.unlock();
}
}
protected byte[] serializeDeltaRequest(DeltaSession session, DeltaRequest deltaRequest)
throws IOException
{
try
{
session.lock();
return deltaRequest.serialize();
} finally {
session.unlock();
}
}
protected void deserializeSessions(byte[] data)
throws ClassNotFoundException, IOException
{
ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();
ObjectInputStream ois = null;
try
{
ois = getReplicationStream(data);
Integer count = (Integer)ois.readObject();
int n = count.intValue();
for (int i = 0; i < n; i++) {
DeltaSession session = (DeltaSession)createEmptySession();
session.readObjectData(ois);
session.setManager(this);
session.setValid(true);
session.setPrimarySession(false);
session.access();
session.setAccessCount(0);
session.resetDeltaRequest();
if (findSession(session.getIdInternal()) == null) {
this.sessionCounter += 1;
} else {
this.sessionReplaceCounter += 1L;
if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.loading.existing.session", session.getIdInternal()));
}
add(session);
}
} catch (ClassNotFoundException e) {
log.error(sm.getString("deltaManager.loading.cnfe", e), e);
throw e;
} catch (IOException e) {
log.error(sm.getString("deltaManager.loading.ioe", e), e);
throw e;
}
finally {
try {
if (ois != null) ois.close();
}
catch (IOException f)
{
}
ois = null;
if (originalLoader != null) Thread.currentThread().setContextClassLoader(originalLoader);
}
}
protected byte[] serializeSessions(Session[] currentSessions)
throws IOException
{
ByteArrayOutputStream fos = null;
ObjectOutputStream oos = null;
try
{
fos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(new BufferedOutputStream(fos));
oos.writeObject(new Integer(currentSessions.length));
for (int i = 0; i < currentSessions.length; i++) {
((DeltaSession)currentSessions[i]).writeObjectData(oos);
}
oos.flush();
} catch (IOException e) {
log.error(sm.getString("deltaManager.unloading.ioe", e), e);
throw e;
} finally {
if (oos != null) {
try {
oos.close();
}
catch (IOException f) {
}
oos = null;
}
}
return fos.toByteArray();
}
public void start()
throws LifecycleException
{
if (!this.initialized) init();
if (this.started) {
return;
}
this.started = true;
this.lifecycle.fireLifecycleEvent("start", null);
generateSessionId();
try
{
Cluster cluster = getCluster();
if (cluster == null) {
Container context = getContainer();
if ((context != null) && ((context instanceof Context))) {
Container host = context.getParent();
if ((host != null) && ((host instanceof Host))) {
cluster = host.getCluster();
if ((cluster != null) && ((cluster instanceof CatalinaCluster))) {
setCluster((CatalinaCluster)cluster);
} else {
Container engine = host.getParent();
if ((engine != null) && ((engine instanceof Engine))) {
cluster = engine.getCluster();
if ((cluster != null) && ((cluster instanceof CatalinaCluster)))
setCluster((CatalinaCluster)cluster);
}
else {
cluster = null;
}
}
}
}
}
if (cluster == null) {
log.error(sm.getString("deltaManager.noCluster", getName()));
return;
}
if (log.isInfoEnabled()) {
String type = "unknown";
if ((cluster.getContainer() instanceof Host))
type = "Host";
else if ((cluster.getContainer() instanceof Engine)) {
type = "Engine";
}
log.info(sm.getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName()));
}
if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.startClustering", getName()));
cluster.registerManager(this);
getAllClusterSessions();
}
catch (Throwable t) {
log.error(sm.getString("deltaManager.managerLoad"), t);
}
}
public synchronized void getAllClusterSessions()
{
if ((this.cluster != null) && (this.cluster.getMembers().length > 0)) {
long beforeSendTime = System.currentTimeMillis();
Member mbr = findSessionMasterMember();
if (mbr == null) {
return;
}
SessionMessage msg = new SessionMessageImpl(getName(), 4, null, "GET-ALL", "GET-ALL-" + getName());
this.stateTransferCreateSendTime = beforeSendTime;
this.counterSend_EVT_GET_ALL_SESSIONS += 1L;
this.stateTransfered = false;
try
{
synchronized (this.receivedMessageQueue) {
this.receiverQueue = true;
}
this.cluster.send(msg, mbr);
if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.waitForSessionState", getName(), mbr, Integer.valueOf(getStateTransferTimeout())));
waitForSendAllSessions(beforeSendTime);
} finally {
synchronized (this.receivedMessageQueue) {
for (Iterator iter = this.receivedMessageQueue.iterator(); iter.hasNext(); ) {
SessionMessage smsg = (SessionMessage)iter.next();
if (!this.stateTimestampDrop) {
messageReceived(smsg, smsg.getAddress() != null ? smsg.getAddress() : null);
}
else if ((smsg.getEventType() != 4) && (smsg.getTimestamp() >= this.stateTransferCreateSendTime))
{
messageReceived(smsg, smsg.getAddress() != null ? smsg.getAddress() : null);
}
else if (log.isWarnEnabled()) {
log.warn(sm.getString("deltaManager.dropMessage", getName(), smsg.getEventTypeString(), new Date(this.stateTransferCreateSendTime), new Date(smsg.getTimestamp())));
}
}
this.receivedMessageQueue.clear();
this.receiverQueue = false;
}
}
}
else if (log.isInfoEnabled()) { log.info(sm.getString("deltaManager.noMembers", getName())); }
}
protected void registerSessionAtReplicationValve(DeltaSession session)
{
if ((this.replicationValve == null) &&
((this.container instanceof StandardContext)) && (((StandardContext)this.container).getCrossContext())) {
Cluster cluster = getCluster();
if ((cluster != null) && ((cluster instanceof CatalinaCluster))) {
Valve[] valves = ((CatalinaCluster)cluster).getValves();
if ((valves != null) && (valves.length > 0)) {
for (int i = 0; (this.replicationValve == null) && (i < valves.length); i++) {
if ((valves[i] instanceof ReplicationValve)) this.replicationValve = ((ReplicationValve)valves[i]);
}
if ((this.replicationValve == null) && (log.isDebugEnabled())) {
log.debug("no ReplicationValve found for CrossContext Support");
}
}
}
}
if (this.replicationValve != null)
this.replicationValve.registerReplicationSession(session);
}
protected Member findSessionMasterMember()
{
Member mbr = null;
Member[] mbrs = this.cluster.getMembers();
if (mbrs.length != 0) mbr = mbrs[0];
if ((mbr == null) && (log.isWarnEnabled())) log.warn(sm.getString("deltaManager.noMasterMember", getName(), ""));
if ((mbr != null) && (log.isDebugEnabled())) log.warn(sm.getString("deltaManager.foundMasterMember", getName(), mbr));
return mbr;
}
public void messageDataReceived(ClusterMessage cmsg)
{
if ((cmsg != null) && ((cmsg instanceof SessionMessage))) {
SessionMessage msg = (SessionMessage)cmsg;
switch (msg.getEventType()) {
case 1:
case 2:
case 3:
case 4:
case 13:
synchronized (this.receivedMessageQueue) {
if (this.receiverQueue) {
this.receivedMessageQueue.add(msg);
return;
}
}
break;
case 5:
case 6:
case 7:
case 8:
case 9:
case 10:
case 11:
case 12: } messageReceived(msg, msg.getAddress() != null ? msg.getAddress() : null);
}
}
public ClusterMessage requestCompleted(String sessionId)
{
return requestCompleted(sessionId, false);
}
public ClusterMessage requestCompleted(String sessionId, boolean expires)
{
DeltaSession session = null;
try {
session = (DeltaSession)findSession(sessionId);
DeltaRequest deltaRequest = session.getDeltaRequest();
session.lock();
msg = null;
boolean isDeltaRequest = false;
synchronized (deltaRequest) {
isDeltaRequest = deltaRequest.getSize() > 0;
if (isDeltaRequest) {
this.counterSend_EVT_SESSION_DELTA += 1L;
byte[] data = serializeDeltaRequest(session, deltaRequest);
msg = new SessionMessageImpl(getName(), 13, data, sessionId, sessionId + "-" + System.currentTimeMillis());
session.resetDeltaRequest();
}
}
if (!isDeltaRequest) {
if ((!expires) && (!session.isPrimarySession())) {
this.counterSend_EVT_SESSION_ACCESSED += 1L;
msg = new SessionMessageImpl(getName(), 3, null, sessionId, sessionId + "-" + System.currentTimeMillis());
if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.createMessage.accessChangePrimary", getName(), sessionId));
}
}
}
else if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.createMessage.delta", getName(), sessionId));
}
if (!expires)
session.setPrimarySession(true);
long replDelta;
if ((!expires) && (msg == null)) {
replDelta = System.currentTimeMillis() - session.getLastTimeReplicated();
if (replDelta > getMaxInactiveInterval() * 1000) {
this.counterSend_EVT_SESSION_ACCESSED += 1L;
msg = new SessionMessageImpl(getName(), 3, null, sessionId, sessionId + "-" + System.currentTimeMillis());
if (log.isDebugEnabled()) {
log.debug(sm.getString("deltaManager.createMessage.access", getName(), sessionId));
}
}
}
if (msg != null) {
session.setLastTimeReplicated(System.currentTimeMillis());
msg.setTimestamp(session.getLastTimeReplicated());
}
return msg;
}
catch (IOException x)
{
SessionMessage msg;
log.error(sm.getString("deltaManager.createMessage.unableCreateDeltaRequest", sessionId), x);
return null;
} finally {
if (session != null) session.unlock();
}
}
protected void messageReceived(SessionMessage msg, Member sender)
{
if ((doDomainReplication()) && (!checkSenderDomain(msg, sender))) {
return;
}
ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
try
{
ClassLoader[] loaders = getClassLoaders();
if ((loaders != null) && (loaders.length > 0)) Thread.currentThread().setContextClassLoader(loaders[0]);
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.eventType", getName(), msg.getEventTypeString(), sender));
switch (msg.getEventType()) {
case 4:
handleGET_ALL_SESSIONS(msg, sender);
break;
case 12:
handleALL_SESSION_DATA(msg, sender);
break;
case 14:
handleALL_SESSION_TRANSFERCOMPLETE(msg, sender);
break;
case 1:
handleSESSION_CREATED(msg, sender);
break;
case 2:
handleSESSION_EXPIRED(msg, sender);
break;
case 3:
handleSESSION_ACCESSED(msg, sender);
break;
case 13:
handleSESSION_DELTA(msg, sender);
case 5:
case 6:
case 7:
case 8:
case 9:
case 10:
case 11:
}
} catch (Exception x) { log.error(sm.getString("deltaManager.receiveMessage.error", getName()), x);
} finally {
Thread.currentThread().setContextClassLoader(contextLoader);
}
}
protected void handleALL_SESSION_TRANSFERCOMPLETE(SessionMessage msg, Member sender)
{
this.counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE += 1;
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.transfercomplete", getName(), sender.getHost(), new Integer(sender.getPort())));
this.stateTransferCreateSendTime = msg.getTimestamp();
this.stateTransfered = true;
}
protected void handleSESSION_DELTA(SessionMessage msg, Member sender)
throws IOException, ClassNotFoundException
{
this.counterReceive_EVT_SESSION_DELTA += 1L;
byte[] delta = msg.getSession();
DeltaSession session = (DeltaSession)findSession(msg.getSessionID());
if (session != null) {
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.delta", getName(), msg.getSessionID())); try
{
session.lock();
DeltaRequest dreq = deserializeDeltaRequest(session, delta);
dreq.execute(session, this.notifyListenersOnReplication);
session.setPrimarySession(false);
} finally {
session.unlock();
}
}
}
protected void handleSESSION_CREATED(SessionMessage msg, Member sender)
{
this.counterReceive_EVT_SESSION_CREATED += 1L;
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.createNewSession", getName(), msg.getSessionID()));
DeltaSession session = (DeltaSession)createEmptySession();
session.setManager(this);
session.setValid(true);
session.setPrimarySession(false);
session.setCreationTime(msg.getTimestamp());
session.setMaxInactiveInterval(getMaxInactiveInterval());
session.access();
if (this.notifySessionListenersOnReplication) {
session.setId(msg.getSessionID());
} else {
session.setIdInternal(msg.getSessionID());
add(session);
}
session.resetDeltaRequest();
session.endAccess();
}
protected void handleALL_SESSION_DATA(SessionMessage msg, Member sender)
throws ClassNotFoundException, IOException
{
this.counterReceive_EVT_ALL_SESSION_DATA += 1L;
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataBegin", getName()));
byte[] data = msg.getSession();
deserializeSessions(data);
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataAfter", getName()));
}
protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender)
throws IOException
{
this.counterReceive_EVT_GET_ALL_SESSIONS += 1L;
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName()));
Session[] currentSessions = findSessions();
long findSessionTimestamp = System.currentTimeMillis();
if (isSendAllSessions()) {
sendSessions(sender, currentSessions, findSessionTimestamp);
}
else {
int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length : getSendAllSessionsSize();
Session[] sendSessions = new Session[len];
for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) {
len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i : getSendAllSessionsSize();
System.arraycopy(currentSessions, i, sendSessions, 0, len);
sendSessions(sender, sendSessions, findSessionTimestamp);
if (getSendAllSessionsWaitTime() > 0)
try {
Thread.sleep(getSendAllSessionsWaitTime());
}
catch (Exception sleep)
{
}
}
}
SessionMessage newmsg = new SessionMessageImpl(this.name, 14, null, "SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED" + getName());
newmsg.setTimestamp(findSessionTimestamp);
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionTransfered", getName()));
this.counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE += 1;
this.cluster.send(newmsg, sender);
}
protected void sendSessions(Member sender, Session[] currentSessions, long sendTimestamp)
throws IOException
{
byte[] data = serializeSessions(currentSessions);
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingAfter", getName()));
SessionMessage newmsg = new SessionMessageImpl(this.name, 12, data, "SESSION-STATE", "SESSION-STATE-" + getName());
newmsg.setTimestamp(sendTimestamp);
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionData", getName()));
this.counterSend_EVT_ALL_SESSION_DATA += 1L;
this.cluster.send(newmsg, sender);
}
}