我正在使用 Netty 4.0.25 编写自己的简单游戏服务器。我想支持 msgpack 二进制数据,因此通道上发送/接收的每条消息总是由 msgpack 编码/解码。
我的服务器:
bossGroup = new NioEventLoopGroup(4);
workerGroup = new NioEventLoopGroup(4);
bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.handler(new LoggingHandler(LogLevel.DEBUG));
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(4));
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
ch.pipeline().addLast(new NettyNioBinaryTcpSession());
}
});
// Bind and start to accept incoming connections.
channelFuture = bootstrap.bind(this.getGatewayConfig().getHost(), this.getGatewayConfig().getPort());
和客户:
group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), getServerAddress().getHost(), getServerAddress()
.getPort()));
}
p.addLast("frameEncoder", new LengthFieldPrepender(4));
p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
p.addLast(new SessionClientHandler());
}
});
// Start the client.
channelFuture = bootstrap.connect(getServerAddress().getHost(), getServerAddress().getPort());
一切正常,直到我尝试从客户端发送 1000 条消息/秒并从服务器回显它。我的服务器正常工作,没有记录错误,但在我的客户端,我得到:
2015-03-04 20:20:22.173 [SocketReceivingWorker-3] WARN com.puppet.client.io.SocketReceivingHandler (SocketReceivingHandler.java:71) - got pong message, but message id not found���� -> length: 4 -> base 64: 1//z/Q==
从客户端发送到服务器并回显的每条消息都有 12 个字节的大小。
我不知道为什么,我已经尝试研究了几天但没有找到。
提前致谢,
更新:SocketReceivingHandler.java:
package com.puppet.client.io;
import static org.msgpack.template.Templates.TByteArray;
import static org.msgpack.template.Templates.TString;
import static org.msgpack.template.Templates.tMap;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Base64;
import java.util.Map;
import org.msgpack.MessagePack;
import org.msgpack.MessageTypeException;
import org.msgpack.template.Template;
import org.msgpack.unpacker.Unpacker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.lmax.disruptor.WorkHandler;
import com.puppet.client.PuppetClient;
import com.puppet.client.exceptions.MessageUnrecognizedException;
import com.puppet.common.Fields;
import com.puppet.common.PuCommand;
import com.puppet.common.PuEventType;
import com.puppet.common.message.concrete.PingMessage;
import com.puppet.eventdriven.impl.BaseEvent;
public class SocketReceivingHandler implements WorkHandler<SocketReceivingEvent> {
private static final Logger logger = LoggerFactory.getLogger(SocketReceivingHandler.class);
private static final MessagePack msgpack = new MessagePack();
private static Template<Map<String, byte[]>> stringToBytesMapTemplate = tMap(TString, TByteArray);
private PuppetClient client;
public SocketReceivingHandler(PuppetClient client) {
this.client = client;
}
@Override
public void onEvent(SocketReceivingEvent event) throws Exception {
ByteArrayInputStream byteArrInputStream = new ByteArrayInputStream(event.getData());
Unpacker unpacker = msgpack.createUnpacker(byteArrInputStream);
try {
Map<String, byte[]> request = unpacker.read(stringToBytesMapTemplate);
PuCommand command = PuCommand.fromId(request.get(Fields.COMMAND));
if (command != null) {
handleCommand(command, request.get(Fields.DATA));
} else {
throw new MessageUnrecognizedException();
}
} catch (MessageTypeException mtEx) {
logger.debug("message error: " + new String(event.getData()) + " -> length " + event.getData().length
+ " -> base64: " + Base64.getEncoder().encodeToString(event.getData()));
throw new MessageUnrecognizedException(mtEx);
}
}
private void handleCommand(PuCommand command, byte[] data) throws IOException {
switch (command) {
case PONG:
this.client.dispatchEvent(new BaseEvent(PuEventType.PONG, "data", data));
Long startTime = PingMessage.getTimeForMessageId(data);
PingMessage.removeTimeForMessageId(data);
if (startTime != null) {
this.client.dispatchEvent(new BaseEvent(PuEventType.PING_PONG, "delay", System.nanoTime() - startTime));
} else {
logger.warn("got pong message, but message id not found" + new String(data) + " -> length: "
+ data.length + " -> base 64: " + Base64.getEncoder().encodeToString(data));
}
break;
default:
logger.debug("unrecognized command: " + command);
break;
}
}
}
event.getData() ==> 从套接字读取的数据并进入 channelRead 方法。