2

channelBound我有一个关于绑定请求和接收事件的上游处理程序的同步问题。channelBound由于处理程序需要使用对象来处理回调,因此我需要在处理程序接收事件之前将对象附加到通道。下面的例子。

处理程序示例:

public class MyClientHandler extends SimpleChannelUpstreamHandler {

    @Override
    public void channelBound(ChannelHandlerContext ctx, ChannelStateEvent e) {

        /* Problem: This can occur while the channel attachment is still null. */
        MyStatefulObject obj = e.getChannel().getAttachment();

        /* Do important things with attachment. */ 
    }

}

主要示例:

ClientBootstrap bootstrap = ... //Assume this has been configured correctly.

ChannelFuture f = bootstrap.bind(new InetSocketAddress("192.168.0.15", 0));

/* It is possible the boundEvent has already been fired upstream
 *  by the IO thread when I get here. 
 */
f.getChannel().setAttachment(new MyStatefulObject());

可能的灵魂

我想出了几个解决方案来解决这个问题,但它们都有种“气味”,这就是为什么我在这里询问是否有人有干净的方法来做到这一点。

解决方案1:在回调中旋转或阻塞,channelBound直到附件不为空。我不喜欢这个解决方案,因为它占用了一个 I/O 工作者。

解决方案 2:MyClientHandler进入双向处理程序并ThreadLocalbindRequested下游回调中使用 a 获取附件。我不喜欢这样,因为它依赖于请求线程用于触发bindRequested事件的 Netty 实现细节。

我发现解决方案 1 比解决方案 2 更容易忍受。所以如果这是我需要做的,我会的。

有没有一种简单的方法来获取通道引用而无需先请求绑定或连接?

4

2 回答 2

2

让您的ChannelPipelineFactory实现接受构造函数参数并在那里指定附件。将一个处理程序放在所有其他处理程序的前面,并使第一个处理程序的channelOpen()方法设置附件,然后从管道中删除第一个处理程序,因为它不再需要了。

于 2012-06-30T09:13:59.057 回答
2

是的,在您将附件设置到频道之前,boundEvent 可能会获取处理程序。

如果附件对您打开的每个频道都非常具体,那么您可以在绑定未来注册一个频道未来监听器,并通过设置所有内容而不使用 BootStraps 在 operationComplete() 上设置附件。以下是 EchoClient 示例的修改版本,它工作正常。

       // Configure the client.
    final NioClientSocketChannelFactory clientSocketChannelFactory = new NioClientSocketChannelFactory(
            Executors.newCachedThreadPool());


    // Set up the pipeline factory.
    final ChannelPipelineFactory channelPipelineFactory = new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() throws Exception {
            return Channels.pipeline(
                    new MyClientHandler());
        }
    };

    ChannelPipeline pipeline = channelPipelineFactory.getPipeline();
    final Channel channel = clientSocketChannelFactory.newChannel(pipeline);

    channel.getConfig().setPipelineFactory(channelPipelineFactory);
    channel.getConfig().setOption("tcpNoDelay", true);
    channel.getConfig().setOption("receiveBufferSize", 1048576);
    channel.getConfig().setOption("sendBufferSize", 1048576);

    ChannelFuture boundFuture = Channels.future(channel);

    boundFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                future.getChannel().setAttachment(new Object());// set the channel attachment
            }
        }
    });


    channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(channel, boundFuture, ChannelState.BOUND, new InetSocketAddress(host, 0)));

    ChannelFuture connectFuture = Channels.future(channel); 
    channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(channel, connectFuture, ChannelState.CONNECTED, new InetSocketAddress(host, port)));

    channel.getCloseFuture().awaitUninterruptibly();

    clientSocketChannelFactory.releaseExternalResources();// do not forget to do this
于 2012-06-30T10:00:48.650 回答