9

我正在尝试实现一些基于 websockets 的应用程序,它将与 JS 客户端进行非常密集的通信。

发送消息的代码非常原始:

synchronized (session) {
    if (session.isOpen()) {
        session.getBasicRemote().sendText(message);
    }
}

对于罕见的发送,它工作得很好,但是当少数线程试图通过同一个会话(套接字)发送一些消息时,会引发下一个异常(请注意,这不是多线程问题,因为代码块是由会话同步的):

java.lang.IllegalStateException: The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for called method
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.checkState(WsRemoteEndpointImplBase.java:1015)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.textStart(WsRemoteEndpointImplBase.java:978)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendString(WsRemoteEndpointImplBase.java:161)
at org.apache.tomcat.websocket.WsRemoteEndpointBasic.sendText(WsRemoteEndpointBasic.java:37)

谷歌目前并没有丰富的此类异常,在这个问题上打了几个小时后,仍然没有解决方案。

Java 7.0.21,在 Tomcat 7.0.52 和 Tomcat 8.0.3 上测试。

任何答案都非常感谢!提前致谢。

2014 年 3 月 11 日更新:我使用 Jetty 9.1 测试了我的应用程序,但没有发生此异常。我假设这是 Tomcat 实现错误。

4

3 回答 3

10

好的,这不是 Tomcat 问题,而是我的错。

我的 onMessage 函数返回了一个字符串,这意味着我回显了该消息。结果,这部分代码没有同步。

坏的:

@OnMessage
public String onMessage(String message, Session session) {
   ...
   return message;
}

好的:

@OnMessage
public void onMessage(String message, Session session) {
   ...
}
于 2014-03-12T09:09:04.293 回答
3

我发现了这个:https ://bz.apache.org/bugzilla/show_bug.cgi?id=56026

似乎tomcat做了一些意想不到的事情,作为一种解决方法,您必须同步所有 session.sendxxx 调用,无论它是否是异步的。

于 2016-10-24T11:55:12.130 回答
3

我今天刚遇到这个问题,接受的答案不是我的解决方案。我尝试将每次调用同步到我的代码中的远程端点,这只有 4 个实例。那也没有解决它。我还尝试更新到最新的 tomcat 版本,此时它是 9.0.24,它没有修复它。

我的问题的根源在于,在传入的单个 websocket 消息请求中,我碰巧在请求期间发送了两条不同的消息(故意)。我验证了两个 sendText 调用都正确同步,它们在不同的块中被调用大约 0.001 毫秒或更少。

我快速制定的解决方案是使用远程端点的异步版本,并确保在请求发送下一个 msg 时完成最后一个 msg 的未来。我对此并不感到兴奋,但它确实解决了问题......这是我写的类,我现在只要我想通过 websocket 发送一些东西而不需要同步块中的代码,因为发送* 此类上的方法已经同步。希望这可以帮助某人。

注意:除了发送*之外,我没有同步任何东西,所以不确定 Ping/Pong 是否会出现同样的问题,我从未使用过这些。

public class WebSocketEndpointAsync implements RemoteEndpoint.Async {
    private final Session _session;
    private final Async _ep;
    private Future<Void> _lastFuture = null;

    public WebSocketEndpointAsync(Session session, Async ep)
    {
        _session = session;
        _ep = ep;
    }

    @Override public long getSendTimeout() { return _ep.getSendTimeout(); }
    @Override public void setSendTimeout(long timeout) { _ep.setSendTimeout(timeout); }
    @Override public void setBatchingAllowed(boolean allowed) throws IOException { _ep.setBatchingAllowed(allowed); }
    @Override public boolean getBatchingAllowed() { return _ep.getBatchingAllowed(); }
    @Override public void flushBatch() throws IOException { _ep.flushBatch(); }
    @Override public void sendPing(ByteBuffer byteBuffer) throws IOException, IllegalArgumentException { _ep.sendPing(byteBuffer); }
    @Override public void sendPong(ByteBuffer byteBuffer) throws IOException, IllegalArgumentException { _ep.sendPong(byteBuffer); }

    @Override public void sendText(String s, SendHandler sendHandler) { throw new UnsupportedOperationException(); }
    @Override public void sendBinary(ByteBuffer byteBuffer, SendHandler sendHandler) { throw new UnsupportedOperationException(); }
    @Override public void sendObject(Object o, SendHandler sendHandler) { throw new UnsupportedOperationException(); }

    protected synchronized void checkLastSendComplete() {
        if (_lastFuture != null) {
            try {
                if (!_lastFuture.isDone()) {
                    // Only one write to the websocket can happen at a time, so we need to make sure the last one completed
                    // else we get ...
                    // java.lang.IllegalStateException: The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for called method
                    do { Thread.sleep(1); }
                    while (!_lastFuture.isDone());
                }
                // Get the result to ensure
                var ignore = _lastFuture.get();
            }
            catch (InterruptedException ie) { }
            catch (ExecutionException ee) { }
        }
    }
    @Override
    public synchronized Future<Void> sendText(String text) {
        checkLastSendComplete();
        return (_lastFuture = _ep.sendText(text));
    }

    @Override
    public synchronized Future<Void> sendBinary(ByteBuffer byteBuffer) {
        checkLastSendComplete();
        return (_lastFuture = _ep.sendBinary(byteBuffer));
    }

    @Override
    public synchronized Future<Void> sendObject(Object obj) {
        checkLastSendComplete();
        return (_lastFuture = _ep.sendObject(obj));
    }
}
于 2019-08-26T14:44:16.293 回答