为了同时支持原始HTTP和WebSocket协议,您需要实现一个自定义io.netty.channel.ChannelInitializer
,您将在其中插入一个HttpRequestHandler
和 a WebSocketServerProtocolHandler
(以及所需的编码和解码处理程序)以支持自定义uri上的WebSocket协议升级:
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
private static final String WEBSOCKET_PATH = "/ws";
private final SslContext sslCtx;
public WebSocketServerInitializer(SslContext sslCtx) {
this.sslCtx = sslCtx;
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new HttpRequestHandler(WEBSOCKET_PATH));
pipeline.addLast(new WebSocketServerCompressionHandler());
pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true));
pipeline.addLast(new WebSocketIndexPageHandler(WEBSOCKET_PATH));
pipeline.addLast(new WebSocketFrameHandler());
}
}
这是一个示例HttpRequestHandler
:
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final String websocketUri;
public HttpRequestHandler(String wsUri) {
websockeUri = wsUri;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (this. websocketUri.equalsIgnoreCase(request.getUri())) { // if the request uri matches the web socket path, we forward to next handler which will handle the upgrade handshake
ctx.fireChannelRead(request.retain()); // we need to increment the reference count to retain the ByteBuf for upcoming processing
} else {
// Otherwise, process your HTTP request and send the flush the response
HttpResponse response = new DefaultHttpResponse(
request.getProtocolVersion(), HttpResponseStatus.OK);
response.headers().set(
HttpHeaders.Names.CONTENT_TYPE,
"text/html; charset=UTF-8");
ctx.write(response);
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
future.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
}
下面是WebSocketHandler
回显大写框架文本的实现:
public class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// If the WebSocket handshake was successful, we remove the HttpRequestHandler from the pipeline as we are no more supporting raw HTTP requests
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
ctx.pipeline().remove(HttpRequestHandler.class);
} else {
// otherwise forward to next handler
super.userEventTriggered(ctx, evt);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
if (frame instanceof TextWebSocketFrame) {
// Send the uppercase string back.
String request = ((TextWebSocketFrame) frame).text();
ctx.channel().writeAndFlush(new TextWebSocketFrame(request.toUpperCase(Locale.US)));
} else {
String message = "unsupported frame type: " + frame.getClass().getName();
throw new UnsupportedOperationException(message);
}
}
}