设置
- Windows 7 专业版
- 日蚀朱诺
- Java jre7
- Netty 4.0.0 Beta2
我有一个 netty 服务器在另一台机器上运行。然后我在我的机器上运行了一个程序,该程序用于模拟许多客户端同时与服务器通信。为了做到这一点,我有一个用java.util.concurrent.ExecutorService实现的线程池。每个客户端创建一个线程并将其提交给 ExecutorService。就在它结束之前,该线程创建另一个具有相同代码的线程。提交的代码执行这些步骤:
- 通过发送握手连接到服务器(netty bootstrap A 和 channel A)
- 从握手响应中获取令牌
- 连接到服务器(netty bootstrap B 和通道 B)
- 向服务器发送一个请求
- 收到回复
- 关闭连接
- 使用相同的代码创建另一个线程
问题我有时在向服务器发送请求时在 Channel.write( byteBuf)
上的 NettySocketCommunication.sendMessage() 中得到 NullPointerException 。
01728 16:25:23.870 [nioEventLoopGroup-3804-2] 错误 cfsvirtualuser.VirtualUser - java.lang.RuntimeException: java.lang.NullPointerException at cfsvirtualuser.VirtualUser.processMessageStep(VirtualUser.java:324) at cfsvirtualuser.VirtualUser.processNextStep(VirtualUser .java:252) at cfsvirtualuser.VirtualUser.onChannelConnected(VirtualUser.java:395) at cfscmhandler.ClientSocketBasedHandler.channelActive(ClientSocketBasedHandler.java:95) at io.netty.channel.DefaultChannelHandlerContext.invokeChannelActive(DefaultChannelHandlerContext.java:774) at io .netty.channel.DefaultChannelHandlerContext.fireChannelActive(DefaultChannelHandlerContext.java:760) 在 io.netty.channel.ChannelStateHandlerAdapter.channelActive(ChannelStateHandlerAdapter.java:58) 在 io.netty.channel。DefaultChannelHandlerContext.invokeChannelActive(DefaultChannelHandlerContext.java:774) at io.netty.channel.DefaultChannelHandlerContext.fireChannelActive(DefaultChannelHandlerContext.java:760) at io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:884) at io.netty.channel .nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:223) 在 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:417) 在 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java :365) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:302) at io.netty.channel.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:110) at java.lang.Thread.run(Unknown资源)774)在 io.netty.channel.DefaultChannelHandlerContext.fireChannelActive(DefaultChannelHandlerContext.java:760) 在 io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:884) 在 io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect (AbstractNioChannel.java:223) 位于 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:365) 的 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:417)。 channel.nio.NioEventLoop.run(NioEventLoop.java:302) at io.netty.channel.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:110) at java.lang.Thread.run(Unknown Source)774)在 io.netty.channel.DefaultChannelHandlerContext.fireChannelActive(DefaultChannelHandlerContext.java:760) 在 io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:884) 在 io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect (AbstractNioChannel.java:223) 位于 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:365) 的 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:417)。 channel.nio.NioEventLoop.run(NioEventLoop.java:302) at io.netty.channel.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:110) at java.lang.Thread.run(Unknown Source)fireChannelActive(DefaultChannelPipeline.java:884) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:223) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:417) at io .netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:365) 在 io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:302) 在 io.netty.channel.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor. java:110) 在 java.lang.Thread.run(未知来源)fireChannelActive(DefaultChannelPipeline.java:884) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:223) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:417) at io .netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:365) 在 io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:302) 在 io.netty.channel.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor. java:110) 在 java.lang.Thread.run(未知来源)365)在io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:302)在io.netty.channel.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:110)在java.lang.Thread.run(未知来源)365)在io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:302)在io.netty.channel.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:110)在java.lang.Thread.run(未知来源)
原因:java.lang.NullPointerException: cfsvirtualuser.VirtualUser.processMessageStep(VirtualUser.java:317) 处的 cfscmNettySocketCommunication.sendMessage(NettySocketCommunication.java:109) 处为 null ...省略了 15 个常见帧
代码
我删除了一些日志记录和注释以使代码块更短。我还有实现 IVirtualUserCommunication 和 IVirtualUserMessages 接口的 VirtualUser.java 类(此处未显示)。
[ AbstractVirtualUserCommunication.java ]
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.util.CharsetUtil;
import [...].Amf3;
import [...].Properties;
import [...].LogFactory;
import [...].Logger;
import [...].AbstractRequest;
import [...].IRouterMessage;
import [...].RouterMessage;
import [...].FrimaHandshake;
import [...].IVirtualUserCommunication;
public abstract class AbstractVirtualUserCommunication implements IVirtualUserCommunication
{
protected static Logger Log = LogFactory.getInstance().getLogger(AbstractVirtualUserCommunication.class);
protected String CONFIG_APPLICATION = "target.server.application";
protected String application;
protected String CONFIG_VERSION = "target.server.version";
protected String version;
protected String CONFIG_HANDSHAKE_PORT = "netty.handshake.port";
protected final int defaultHandshakePort = 80;
// The following variables are used by both HTTP and SOCKET communication
protected Bootstrap bootstrapHandshake; // Netty bootstrap used only for handshake
protected Channel channelHandshake; // Netty channel used only for handshake
protected String token; // The token received through handshake process
// Host & port are set in the connect() method
protected String host;
protected int port;
protected Bootstrap bootstrap; // Netty bootstrap used for communication
protected Channel channel; // Netty channel used for communication
/** Connect to the server to get the token */
public void sendHandshake(String host)
{
// Get properties, with default values if they are not specified
this.application = Properties.getString(CONFIG_APPLICATION, "snowstorm");
this.version = Properties.getString(CONFIG_VERSION, "0.0.1");
int handshakePort = Properties.getInt(CONFIG_HANDSHAKE_PORT, defaultHandshakePort);
bootstrapHandshake = new Bootstrap();
try
{
bootstrapHandshake.group(new NioEventLoopGroup());
bootstrapHandshake.channel(NioSocketChannel.class);
bootstrapHandshake.handler(new HandShakeInitializer(/* this */));
// Connect and listen on handshake host/port
channelHandshake = bootstrapHandshake.connect(host, handshakePort).sync().channel();
channelHandshake.closeFuture().sync();
}
catch (InterruptedException e)
{
Log.error(e);
}
finally
{
bootstrapHandshake.shutdown();
}
}
/** Method called after completion of the handshake (the token has been set). */
protected abstract void afterHandshake();
/** Connect to the target server for stress test script execution. */
protected void connect(ChannelHandler handler)
{
bootstrap = new Bootstrap();
try
{
// Initialize the pipeline
bootstrap.group(new NioEventLoopGroup());
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(handler);
// Connect and listen on host/port
channel = bootstrap.connect(host, port).sync().channel();
if (channel == null)
{
Log.error("PROBLEM : The channel is null in the afterHandshake() method");
}
channel.closeFuture().sync();
}
catch (InterruptedException e)
{
Log.error(e);
}
finally
{
bootstrap.shutdown();
}
}
/** Create a RouterMessage with the specified request. */
protected IRouterMessage buildMessage(AbstractRequest request)
{
RouterMessage routerMessage = new RouterMessage();
routerMessage.bytes = Amf3.serialize(request);
routerMessage.token = this.token;
routerMessage.application = this.application;
routerMessage.version = this.version;
return routerMessage;
}
@Override
public void disconnect()
{
// TODO Is it dangerous to not call channel.close() ??
if (channel != null)
{
channel.close().awaitUninterruptibly();
}
else
{
Log.error("PROBLEM : The channel is null when calling the disconnect() method");
}
bootstrap.shutdown();
}
@Override
public boolean isConnected()
{
if (channel == null)
{
return false;
}
return channel.isActive();
}
private class HandShakeInitializer extends ChannelInitializer<SocketChannel>
{
public HandShakeInitializer()
{
super();
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception
{
socketChannel.pipeline().addLast("encoder", new HttpRequestEncoder());
socketChannel.pipeline().addLast("decoder", new HttpResponseDecoder());
socketChannel.pipeline().addLast("handler", new HandShakeHandler(/* communication */));
}
}
private class HandShakeHandler extends ChannelInboundMessageHandlerAdapter<Object>
{
public HandShakeHandler()
{
super();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
super.channelActive(ctx);
ctx.write(FrimaHandshake.create(null, version, application));
ctx.flush();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception
{
if (msg instanceof DefaultLastHttpContent)
{
DefaultLastHttpContent defaultLastHttpContent = (DefaultLastHttpContent) msg;
String content = defaultLastHttpContent.data().toString(CharsetUtil.UTF_8);
// Format = token~publicDNS and we only need the token here
token = content;// .split("~")[0];
Log.debug("Starting a bot with token " + token);
afterHandshake();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
Log.error(cause);
ctx.close();
}
}
}
[ NettySocketCommunication.java ]
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import [...].AbstractRequest;
import [...].IRouterMessage;
import [...].Serializer;
import [...].ClientSocketBasedHandler;
import [...].ClientSocketBasedInitializer;
import [...].IVirtualUserMessages;
public class NettySocketCommunication extends AbstractVirtualUserCommunication
{
private ClientSocketBasedHandler handler;
private ChannelFuture testChannelFuture;
public NettySocketCommunication()
{
super();
Log.setLevelToInfo();
this.handler = new ClientSocketBasedHandler();
}
@Override
public void setVirtualUser(IVirtualUserMessages virtualUser)
{
this.handler.setVirtualUser(virtualUser);
}
@Override
public void connect(String host, int port)
{
this.host = host;
this.port = port;
// Get the token from the server through the handshake process
sendHandshake(host);
}
@Override
public boolean connectTest(String host, int port)
{
boolean connectSuccess = false;
bootstrap = new Bootstrap();
// Initialize the pipeline
bootstrap.group(new NioEventLoopGroup());
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ClientSocketBasedInitializer(new ClientSocketBasedHandler()));
// Listen on host/port (connect a channel)
testChannelFuture = bootstrap.connect(host, port);
testChannelFuture.awaitUninterruptibly();
if (testChannelFuture.isSuccess())
{
connectSuccess = true;
}
testChannelFuture.channel().close().awaitUninterruptibly();
bootstrap.shutdown();
return connectSuccess;
}
@Override
protected void afterHandshake()
{
super.connect(new ClientSocketBasedInitializer(handler));
}
@Override
public void sendMessage(AbstractRequest request)
{
IRouterMessage routerMessage = buildMessage(request);
ByteBuf byteBuf = Serializer.encode(routerMessage, true);
// Send message
channel.write(byteBuf);
channel.flush();
}
}
[ ClientSocketBasedHandler.java ]
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import java.util.ArrayList;
import java.util.List;
import [...].Amf3;
import [...].LogFactory;
import [...].Logger;
import [...].IMessage;
import [...].IRouterMessage;
import [...].IVirtualUserMessages;
public class ClientSocketBasedHandler extends ChannelInboundMessageHandlerAdapter<IRouterMessage>
{
protected static Logger Log = LogFactory.getInstance().getLogger(ClientSocketBasedHandler.class);
private IVirtualUserMessages virtualUser;
public ClientSocketBasedHandler()
{
super();
Log.setLevelToInfo();
}
public void setVirtualUser(IVirtualUserMessages virtualUser)
{
this.virtualUser = virtualUser;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, IRouterMessage routerMessage) throws Exception
{
List<IMessage> messages = deserializeMessages(routerMessage.getBytes());
for (IMessage message : messages)
{
Log.debug("Received socket : " + message);
if (virtualUser == null)
{
throw new RuntimeException("Must call the setVirtualUser() method before receiving messages");
}
virtualUser.onManticoreMessageReceived(message);
}
}
protected List<IMessage> deserializeMessages(byte[] bytes)
{
Object probablyMessages = Amf3.deserialize(bytes);
List<IMessage> messages = null;
// List of Messages
if (probablyMessages instanceof ArrayList)
{
messages = (List<IMessage>) probablyMessages;
}
// Single Message
else if (probablyMessages instanceof IMessage)
{
messages = new ArrayList<IMessage>(1);
messages.add((IMessage) probablyMessages);
}
// Probably Pollution
else
{
Log.error("Cannot deserialize message '{}'", probablyMessages.toString());
}
return messages;
}
@Override
public void channelActive(ChannelHandlerContext ctx)
{
if (virtualUser != null)
{
virtualUser.onChannelConnected();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
Log.error(cause);
ctx.close();
}
}
搜索
我查看了有关堆栈溢出的 netty 频道相关问题,但找不到与我的案例相关的任何内容。
链接
http://netty.io/4.0/api/io/netty/channel/Channel.html
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html