我正在编写一个从 websocket 服务器发送和接收消息的 Java 应用程序。当应用程序收到消息时,可能需要一些时间来处理它。因此,我尝试使用多个线程来接收消息。据我了解Grizzly
,有选择器线程和工作线程。默认情况下有 1 个选择器线程和 2 个工作线程,在下面的示例中,我试图将它们分别增加到 5 和 10。在下面的示例中,我正在暂停调用onMessage
方法 10 秒来模拟输入信息的处理。信息每秒出现一次,因此 10 个线程应该能够处理流量。当我分析运行时,只有 1 个选择器线程正在运行,2 个工作线程。此外,消息仅以 10 秒的间隔接收。表明只有 1 个线程在处理流量 - 我觉得这很奇怪。在分析期间,一个工作线程例如Grizzly(1)
接收发送的第一条消息。然后 10 秒后,'Grizzly(2)' 收到第二条消息 - 然后Grizzly(2)
继续接收消息,并且Grizzly(1)
不执行任何操作。
有人可以解释一下这种奇怪的行为以及如何将其更改为例如 10 个线程不断排队等待消息吗?
主要的:
public static void main(String[] args) {
WebsocketTextClient client = new WebsocketTextClient();
client.connect();
for (int i = 0; i < 60; i++) {
client.send("Test message " + i);
try {
Thread.sleep(1000);
} catch (Exception e) {
System.out.println("Error sleeping!");
}
}
}
WebsocketTextClient.java:
import java.net.URI;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.EndpointConfig;
import javax.websocket.Session;
import javax.websocket.Endpoint;
import javax.websocket.MessageHandler;
import org.glassfish.tyrus.client.ClientManager;
import org.glassfish.tyrus.client.ThreadPoolConfig;
import org.glassfish.tyrus.container.grizzly.client.GrizzlyClientProperties;
public class WebsocketTextClient {
private ClientManager client;
private ClientEndpointConfig clientConfig;
WebsocketTextClientEndpoint endpoint;
public WebsocketTextClient() {
client = ClientManager.createClient();
client.getProperties().put(GrizzlyClientProperties.SELECTOR_THREAD_POOL_CONFIG, ThreadPoolConfig.defaultConfig().setMaxPoolSize(5));
client.getProperties().put(GrizzlyClientProperties.WORKER_THREAD_POOL_CONFIG, ThreadPoolConfig.defaultConfig().setMaxPoolSize(10));
}
public boolean connect() {
try {
clientConfig = ClientEndpointConfig.Builder.create().build();
endpoint = new WebsocketTextClientEndpoint();
client.connectToServer(endpoint, clientConfig, new URI("wss://echo.websocket.org"));
} catch (Exception e) {
return false;
}
return true;
}
public boolean disconnect() {
return false;
}
public boolean send(String message) {
endpoint.session.getAsyncRemote().sendText(message);
return true;
}
private class WebsocketTextClientEndpoint extends Endpoint {
Session session;
@Override
public void onOpen(Session session, EndpointConfig config) {
System.out.println("Connection opened");
this.session = session;
session.addMessageHandler(new WebsocketTextClientMessageHandler());
}
}
private class WebsocketTextClientMessageHandler implements MessageHandler.Whole<String> {
@Override
public void onMessage(String message) {
System.out.println("Message received from " + Thread.currentThread().getName() + " " + message);
try {
Thread.sleep(10000);
} catch (Exception e) {
System.out.println("Error sleeping!");
}
System.out.println("Resuming");
}
}
}