2

设置

  • Windows 7 专业版
  • 日蚀朱诺
  • Java jre7
  • Netty 4.0.0 Beta2

我有一个 netty 服务器在另一台机器上运行。然后我在我的机器上运行了一个程序,该程序用于模拟许多客户端同时与服务器通信。为了做到这一点,我有一个用java.util.concurrent.ExecutorService实现的线程池。每个客户端创建一个线程并将其提交给 ExecutorService。就在它结束之前,该线程创建另一个具有相同代码的线程。提交的代码执行这些步骤:

  1. 通过发送握手连接到服务器(netty bootstrap A 和 channel A)
  2. 从握手响应中获取令牌
  3. 连接到服务器(netty bootstrap B 和通道 B)
  4. 向服务器发送一个请求
  5. 收到回复
  6. 关闭连接
  7. 使用相同的代码创建另一个线程

问题我有时在向服务器发送请求时在 Channel.write( byteBuf)
上的 NettySocketCommunication.sendMessage() 中得到 NullPointerException 。

01728 16:25:23.870 [nioEventLoopGroup-3804-2] 错误 cfsvirtualuser.VirtualUser - java.lang.RuntimeException: java.lang.NullPointerException at cfsvirtualuser.VirtualUser.processMessageStep(VirtualUser.java:324) at cfsvirtualuser.VirtualUser.processNextStep(VirtualUser .java:252) at cfsvirtualuser.VirtualUser.onChannelConnected(VirtualUser.java:395) at cfscmhandler.ClientSocketBasedHandler.channelActive(ClientSocketBasedHandler.java:95) at io.netty.channel.DefaultChannelHandlerContext.invokeChannelActive(DefaultChannelHandlerContext.java:774) at io .netty.channel.DefaultChannelHandlerContext.fireChannelActive(DefaultChannelHandlerContext.java:760) 在 io.netty.channel.ChannelStateHandlerAdapter.channelActive(ChannelStateHandlerAdapter.java:58) 在 io.netty.channel。DefaultChannelHandlerContext.invokeChannelActive(DefaultChannelHandlerContext.java:774) at io.netty.channel.DefaultChannelHandlerContext.fireChannelActive(DefaultChannelHandlerContext.java:760) at io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:884) at io.netty.channel .nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:223) 在 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:417) 在 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java :365) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:302) at io.netty.channel.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:110) at java.lang.Thread.run(Unknown资源)774)在 io.netty.channel.DefaultChannelHandlerContext.fireChannelActive(DefaultChannelHandlerContext.java:760) 在 io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:884) 在 io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect (AbstractNioChannel.java:223) 位于 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:365) 的 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:417)。 channel.nio.NioEventLoop.run(NioEventLoop.java:302) at io.netty.channel.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:110) at java.lang.Thread.run(Unknown Source)774)在 io.netty.channel.DefaultChannelHandlerContext.fireChannelActive(DefaultChannelHandlerContext.java:760) 在 io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:884) 在 io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect (AbstractNioChannel.java:223) 位于 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:365) 的 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:417)。 channel.nio.NioEventLoop.run(NioEventLoop.java:302) at io.netty.channel.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:110) at java.lang.Thread.run(Unknown Source)fireChannelActive(DefaultChannelPipeline.java:884) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:223) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:417) at io .netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:365) 在 io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:302) 在 io.netty.channel.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor. java:110) 在 java.lang.Thread.run(未知来源)fireChannelActive(DefaultChannelPipeline.java:884) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:223) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:417) at io .netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:365) 在 io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:302) 在 io.netty.channel.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor. java:110) 在 java.lang.Thread.run(未知来源)365)在io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:302)在io.netty.channel.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:110)在java.lang.Thread.run(未知来源)365)在io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:302)在io.netty.channel.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:110)在java.lang.Thread.run(未知来源)
原因:java.lang.NullPointerException: cfsvirtualuser.VirtualUser.processMessageStep(VirtualUser.java:317) 处的 cfscmNettySocketCommunication.sendMessage(NettySocketCommunication.java:109) 处为 null ...省略了 15 个常见帧

代码
我删除了一些日志记录和注释以使代码块更短。我还有实现 IVirtualUserCommunication 和 IVirtualUserMessages 接口的 VirtualUser.java 类(此处未显示)。

[ AbstractVirtualUserCommunication.java ]

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.util.CharsetUtil;

import [...].Amf3;
import [...].Properties;
import [...].LogFactory;
import [...].Logger;
import [...].AbstractRequest;
import [...].IRouterMessage;
import [...].RouterMessage;
import [...].FrimaHandshake;
import [...].IVirtualUserCommunication;

public abstract class AbstractVirtualUserCommunication implements IVirtualUserCommunication
{
    protected static Logger Log = LogFactory.getInstance().getLogger(AbstractVirtualUserCommunication.class);

    protected String CONFIG_APPLICATION = "target.server.application";
    protected String application;
    protected String CONFIG_VERSION = "target.server.version";
    protected String version;

    protected String CONFIG_HANDSHAKE_PORT = "netty.handshake.port";
    protected final int defaultHandshakePort = 80;

    // The following variables are used by both HTTP and SOCKET communication

    protected Bootstrap bootstrapHandshake; // Netty bootstrap used only for handshake
    protected Channel channelHandshake; // Netty channel used only for handshake
    protected String token; // The token received through handshake process

    // Host & port are set in the connect() method
    protected String host;
    protected int port;

    protected Bootstrap bootstrap; // Netty bootstrap used for communication
    protected Channel channel; // Netty channel used for communication

    /** Connect to the server to get the token */
    public void sendHandshake(String host)
    {
        // Get properties, with default values if they are not specified
        this.application = Properties.getString(CONFIG_APPLICATION, "snowstorm");
        this.version = Properties.getString(CONFIG_VERSION, "0.0.1");

        int handshakePort = Properties.getInt(CONFIG_HANDSHAKE_PORT, defaultHandshakePort);

        bootstrapHandshake = new Bootstrap();

        try
        {
            bootstrapHandshake.group(new NioEventLoopGroup());
            bootstrapHandshake.channel(NioSocketChannel.class);
            bootstrapHandshake.handler(new HandShakeInitializer(/* this */));

            // Connect and listen on handshake host/port
            channelHandshake = bootstrapHandshake.connect(host, handshakePort).sync().channel();
            channelHandshake.closeFuture().sync();
        }
        catch (InterruptedException e)
        {
            Log.error(e);
        }
        finally
        {
            bootstrapHandshake.shutdown();
        }
    }

    /** Method called after completion of the handshake (the token has been set). */
    protected abstract void afterHandshake();

    /** Connect to the target server for stress test script execution. */
    protected void connect(ChannelHandler handler)
    {
        bootstrap = new Bootstrap();

        try
        {
            // Initialize the pipeline
            bootstrap.group(new NioEventLoopGroup());
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(handler);

            // Connect and listen on host/port
            channel = bootstrap.connect(host, port).sync().channel();

            if (channel == null)
            {
                Log.error("PROBLEM : The channel is null in the afterHandshake() method");
            }

            channel.closeFuture().sync();
        }
        catch (InterruptedException e)
        {
            Log.error(e);
        }
        finally
        {
            bootstrap.shutdown();
        }
    }

    /** Create a RouterMessage with the specified request. */
    protected IRouterMessage buildMessage(AbstractRequest request)
    {
        RouterMessage routerMessage = new RouterMessage();
        routerMessage.bytes = Amf3.serialize(request);
        routerMessage.token = this.token;
        routerMessage.application = this.application;
        routerMessage.version = this.version;

        return routerMessage;
    }

    @Override
    public void disconnect()
    {
        // TODO Is it dangerous to not call channel.close() ??
        if (channel != null)
        {
            channel.close().awaitUninterruptibly();
        }
        else
        {
            Log.error("PROBLEM : The channel is null when calling the disconnect() method");
        }

        bootstrap.shutdown();
    }

    @Override
    public boolean isConnected()
    {
        if (channel == null)
        {
            return false;
        }
        return channel.isActive();
    }

    private class HandShakeInitializer extends ChannelInitializer<SocketChannel>
    {
        public HandShakeInitializer()
        {
            super();
        }

        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception
        {
            socketChannel.pipeline().addLast("encoder", new HttpRequestEncoder());
            socketChannel.pipeline().addLast("decoder", new HttpResponseDecoder());
            socketChannel.pipeline().addLast("handler", new HandShakeHandler(/* communication */));
        }
    }

    private class HandShakeHandler extends ChannelInboundMessageHandlerAdapter<Object>
    {
        public HandShakeHandler()
        {
            super();
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception
        {
            super.channelActive(ctx);

            ctx.write(FrimaHandshake.create(null, version, application));
            ctx.flush();
        }

        @Override
        public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception
        {
            if (msg instanceof DefaultLastHttpContent)
            {
                DefaultLastHttpContent defaultLastHttpContent = (DefaultLastHttpContent) msg;

                String content = defaultLastHttpContent.data().toString(CharsetUtil.UTF_8);

                // Format = token~publicDNS and we only need the token here
                token = content;// .split("~")[0];

                Log.debug("Starting a bot with token " + token);

                afterHandshake();
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
        {
            Log.error(cause);
            ctx.close();
        }
    }
}

[ NettySocketCommunication.java ]

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import [..­.].AbstractRequest;
import [..­.].IRouterMessage;
import [..­.].Serializer;
import [..­.].ClientSocketBasedHandler;
import [..­.].ClientSocketBasedInitializer;
import [..­.].IVirtualUserMessages;

public class NettySocketCommunication extends AbstractVirtualUserCommunication
{
    private ClientSocketBasedHandler handler;
    private ChannelFuture testChannelFuture;

    public NettySocketCommunication()
    {
        super();
        Log.setLevelToInfo();
        this.handler = new ClientSocketBasedHandler();
    }

    @Override
    public void setVirtualUser(IVirtualUserMessages virtualUser)
    {
        this.handler.setVirtualUser(virtualUser);
    }

    @Override
    public void connect(String host, int port)
    {
        this.host = host;
        this.port = port;
        // Get the token from the server through the handshake process
        sendHandshake(host);
    }

    @Override
    public boolean connectTest(String host, int port)
    {
        boolean connectSuccess = false;

        bootstrap = new Bootstrap();

        // Initialize the pipeline
        bootstrap.group(new NioEventLoopGroup());
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new ClientSocketBasedInitializer(new ClientSocketBasedHandler()));

        // Listen on host/port (connect a channel)
        testChannelFuture = bootstrap.connect(host, port);
        testChannelFuture.awaitUninterruptibly();

        if (testChannelFuture.isSuccess())
        {
            connectSuccess = true;
        }

        testChannelFuture.channel().close().awaitUninterruptibly();
        bootstrap.shutdown();

        return connectSuccess;
    }

    @Override
    protected void afterHandshake()
    {
        super.connect(new ClientSocketBasedInitializer(handler));
    }

    @Override
    public void sendMessage(AbstractRequest request)
    {
        IRouterMessage routerMessage = buildMessage(request);

        ByteBuf byteBuf = Serializer.encode(routerMessage, true);
        // Send message
        channel.write(byteBuf);
        channel.flush();
    }
}

[ ClientSocketBasedHandler.java ]

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;

import java.util.ArrayList;
import java.util.List;

import [...].Amf3;
import [...].LogFactory;
import [...].Logger;
import [...].IMessage;
import [...].IRouterMessage;
import [...].IVirtualUserMessages;

public class ClientSocketBasedHandler extends ChannelInboundMessageHandlerAdapter<IRouterMessage>
{
    protected static Logger Log = LogFactory.getInstance().getLogger(ClientSocketBasedHandler.class);

    private IVirtualUserMessages virtualUser;

    public ClientSocketBasedHandler()
    {
        super();
        Log.setLevelToInfo();
    }

    public void setVirtualUser(IVirtualUserMessages virtualUser)
    {
        this.virtualUser = virtualUser;
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, IRouterMessage routerMessage) throws Exception
    {
        List<IMessage> messages = deserializeMessages(routerMessage.getBytes());

        for (IMessage message : messages)
        {
            Log.debug("Received socket : " + message);

            if (virtualUser == null)
            {
                throw new RuntimeException("Must call the setVirtualUser() method before receiving messages");
            }

            virtualUser.onManticoreMessageReceived(message);
        }
    }

    protected List<IMessage> deserializeMessages(byte[] bytes)
    {
        Object probablyMessages = Amf3.deserialize(bytes);
        List<IMessage> messages = null;

        // List of Messages
        if (probablyMessages instanceof ArrayList)
        {
            messages = (List<IMessage>) probablyMessages;
        }
        // Single Message
        else if (probablyMessages instanceof IMessage)
        {
            messages = new ArrayList<IMessage>(1);
            messages.add((IMessage) probablyMessages);
        }
        // Probably Pollution
        else
        {
            Log.error("Cannot deserialize message '{}'", probablyMessages.toString());
        }

        return messages;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx)
    {
        if (virtualUser != null)
        {
            virtualUser.onChannelConnected();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
    {
        Log.error(cause);
        ctx.close();
    }
}

搜索
我查看了有关堆栈溢出的 netty 频道相关问题,但找不到与我的案例相关的任何内容。

链接
http://netty.io/4.0/api/io/netty/channel/Channel.html
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html

4

0 回答 0