我今天刚遇到这个问题,接受的答案不是我的解决方案。我尝试将每次调用同步到我的代码中的远程端点,这只有 4 个实例。那也没有解决它。我还尝试更新到最新的 tomcat 版本,此时它是 9.0.24,它没有修复它。
我的问题的根源在于,在传入的单个 websocket 消息请求中,我碰巧在请求期间发送了两条不同的消息(故意)。我验证了两个 sendText 调用都正确同步,它们在不同的块中被调用大约 0.001 毫秒或更少。
我快速制定的解决方案是使用远程端点的异步版本,并确保在请求发送下一个 msg 时完成最后一个 msg 的未来。我对此并不感到兴奋,但它确实解决了问题......这是我写的类,我现在只要我想通过 websocket 发送一些东西而不需要同步块中的代码,因为发送* 此类上的方法已经同步。希望这可以帮助某人。
注意:除了发送*之外,我没有同步任何东西,所以不确定 Ping/Pong 是否会出现同样的问题,我从未使用过这些。
public class WebSocketEndpointAsync implements RemoteEndpoint.Async {
private final Session _session;
private final Async _ep;
private Future<Void> _lastFuture = null;
public WebSocketEndpointAsync(Session session, Async ep)
{
_session = session;
_ep = ep;
}
@Override public long getSendTimeout() { return _ep.getSendTimeout(); }
@Override public void setSendTimeout(long timeout) { _ep.setSendTimeout(timeout); }
@Override public void setBatchingAllowed(boolean allowed) throws IOException { _ep.setBatchingAllowed(allowed); }
@Override public boolean getBatchingAllowed() { return _ep.getBatchingAllowed(); }
@Override public void flushBatch() throws IOException { _ep.flushBatch(); }
@Override public void sendPing(ByteBuffer byteBuffer) throws IOException, IllegalArgumentException { _ep.sendPing(byteBuffer); }
@Override public void sendPong(ByteBuffer byteBuffer) throws IOException, IllegalArgumentException { _ep.sendPong(byteBuffer); }
@Override public void sendText(String s, SendHandler sendHandler) { throw new UnsupportedOperationException(); }
@Override public void sendBinary(ByteBuffer byteBuffer, SendHandler sendHandler) { throw new UnsupportedOperationException(); }
@Override public void sendObject(Object o, SendHandler sendHandler) { throw new UnsupportedOperationException(); }
protected synchronized void checkLastSendComplete() {
if (_lastFuture != null) {
try {
if (!_lastFuture.isDone()) {
// Only one write to the websocket can happen at a time, so we need to make sure the last one completed
// else we get ...
// java.lang.IllegalStateException: The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for called method
do { Thread.sleep(1); }
while (!_lastFuture.isDone());
}
// Get the result to ensure
var ignore = _lastFuture.get();
}
catch (InterruptedException ie) { }
catch (ExecutionException ee) { }
}
}
@Override
public synchronized Future<Void> sendText(String text) {
checkLastSendComplete();
return (_lastFuture = _ep.sendText(text));
}
@Override
public synchronized Future<Void> sendBinary(ByteBuffer byteBuffer) {
checkLastSendComplete();
return (_lastFuture = _ep.sendBinary(byteBuffer));
}
@Override
public synchronized Future<Void> sendObject(Object obj) {
checkLastSendComplete();
return (_lastFuture = _ep.sendObject(obj));
}
}