0

最终编辑/结论

这是一个与netty无关的问题,仍然很难调试。messageReceived 中的工作线程有时会被阻塞,因此一段时间后池中没有可用的线程。

原来的问题

在我的公司,我们使用 netty 来监听来自 GPS 跟踪设备的连接。跟踪器通过 GPRS 进行通信。

我们经历了 netty 3.2.4-final 的非常奇怪的行为。

一段时间后(我无法准确说出多少,但接近一天),我们没有收到来自追踪器的任何消息。这意味着我们的 SimpleCahnnelUpstreamHandler 实现的 messageReceived 方法不会被调用!但是,如果我使用 tcpdump 捕获所有数据包,我可以看到所有消息进入!

这是一个已知问题,已经在更高版本的 netty 中修复了吗?

我们的通道管道如下所示:

...
final TcpListenerChannelHandler tcpChannelHandler;


@Inject
public TcpListenerPipeline(TcpListenerChannelHandler tcpChannelHandler) {
     this.tcpChannelHandler = tcpChannelHandler;
}

@Override
public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline p = Channels.pipeline();
        p.addLast("frameDecoder", new DelimiterBasedFrameDecoder(2048, Delimiters.lineDelimiter()));
        p.addLast("encoder", new ByteArrayWrapperEncoder());
        p.addLast("handler", tcpChannelHandler);
        return p;
}
...

我们通过以下方式实例化监听:

public void startListen() {
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),20);
        bootstrap = new ServerBootstrap(channelFactory);
        bootstrap.setPipelineFactory(pipeline);
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);
        lazyLogger.getLogger().info("Binding Tcp listener to 0.0.0.0 on port '{}'", listenPort);
        serverChannel = bootstrap.bind(new InetSocketAddress("0.0.0.0", listenPort));
}

有人知道有什么问题吗?还是我们应该每隔一个小时左右手动断开所有频道?

编辑:

我有更多关于这个问题的信息

当没有消息被处理时,也会发生在成功的远程连接上没有调用 channelConnected 的情况。我远程调试了这个问题,发现:

  • 在 NioServerSocketPipelineSink.java 行 #246 registerAcceptedChannel(acceptedSocket, currentThread); 发生
  • 软件执行一直到具有不同事件的 DefaultChannelPipeline 第 781 行,但我的 TcpListenerChannelHandler 永远不在上下文中。

最奇怪的是,有时 netty 会注意到某个通道已连接,有时却没有。

编辑2:

TcpListenerCahnnelHandler 是 SimpleChannelUpstreamHandler 的简单实现

它的亮点:

public class TcpListenerChannelHandler extends SimpleChannelUpstreamHandler {
...
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
    super.channelConnected(ctx, e);
    _logger.info("{} device connected from: {}", deviceProtocol.getName(),  ctx.getChannel().getRemoteAddress());
    deviceConnectionRegistry.channelConnected(ctx.getChannel());
}

@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
    super.channelDisconnected(ctx, e);
    _logger.info("{} device from endpoint '{}' disconnected.", deviceProtocol.getName(), ctx.getChannel().getRemoteAddress());
    deviceConnectionRegistry.channelDisconnected(ctx.getChannel());
}

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception  {
    super.messageReceived(ctx, messageEvent);

    ...
    NOTE: here we process the meassage, I do not think it can cause any problem
}


@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        if(_logger.isWarnEnabled())
            _logger.warn(deviceProtocol.getName()+ " device"
                    +e.getChannel().getRemoteAddress()+" channel", e.getCause());

        if (!(e.getCause() instanceof ConnectException))
            e.getChannel().close();
}

同时我已经升级到 3.3.1-final。如果问题再次发生,我知道在哪里继续调试。

编辑 3:

我已经升级到3.3.1 final,两天后同样的问题再次出现。

我不知道这是否相关,但我们在同一个物理接口上有更多的 IP 地址。我们应该尝试只听一个接口吗?更多 eth 接口是否存在任何已知问题?

但同样:tcpdump 识别跟踪器的消息,但 netty 没有在我的自定义处理程序中调用 messageReceived。

编辑4:

我进一步调试了代​​码。问题出现在 NioWorker.java 第 131 行 (boolean offer = registerTaskQueue.offer(registerTask);) 运行正常,但随后将永远不会处理该任务。这意味着第 748 行的 RegisterTask.run() 永远不会被调用。

4

1 回答 1

1

No idea, did you try to add a LoggingHandler to watch everything? I use to use a custom handler:

/**
 *
 * Adapted from the original LoggingHandler in Netty.
 */
public class LoggingHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler {

    String name;
    boolean hexDump;

    public LoggingHandler(String name, boolean hexDump) {
        this.name = name;
        this.hexDump = hexDump;
    }

    /**
     * Logs the specified event to the {@link InternalLogger} returned by
     * {@link #getLogger()}. If hex dump has been enabled for this handler,
     * the hex dump of the {@link ChannelBuffer} in a {@link MessageEvent} will
     * be logged together.
     */
    public void log(ChannelEvent e) {

        String msg = name + " >> " + e.toString();

        // Append hex dump if necessary.
        if (hexDump && e instanceof MessageEvent) {
            MessageEvent me = (MessageEvent) e;
            if (me.getMessage() instanceof ChannelBuffer) {
                ChannelBuffer buf = (ChannelBuffer) me.getMessage();
                msg = msg + " - (HEXDUMP: " + ChannelBuffers.hexDump(buf) + ')';
            }
        }

        // Log the message (and exception if available.)
        if (e instanceof ExceptionEvent) {
            Logger.debug(this, msg, ((ExceptionEvent) e).getCause());
        } else {
            Logger.debug(this, msg);
        }

    }



    public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
            throws Exception {
        log(e);
        ctx.sendUpstream(e);
    }

    public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
            throws Exception {
        log(e);
        ctx.sendDownstream(e);
    }

wich is inserted in both client and server side. On the server side, I use to add it to both child and parent:

ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
                Executors.newCachedThreadPool());
        ServerBootstrap bootstrap = new ServerBootstrap(factory);
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);

        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("LOGGER", new LoggingHandler("SERVER", true));
                pipeline.addLast("LAUNCHER", handler.new OnChannelConnectedPlugger());
                return pipeline;
            }
        });
        bootstrap.setParentHandler(new LoggingHandler("SERVER-PARENT", true));
于 2012-04-13T12:41:20.683 回答