0

我不知道该怎么做,我认为我的方法是错误的 - 有人可以给我一个提示吗?

我制作了一个与示例 HexDumpProxy非常相似的代理服务器。目的不是转储流量,而是通过代理操纵数据解析,这部分工作完美。让我们称之为代理部分。

在同一个程序中,我使用另一个 ServerBootstrap 启动了第二个线程在另一个端口上监听,这有它自己的事件循环等。当我在这个监听端口上收到一些东西时,我想将此数据发送到代理部分的一个通道,我想要动态地改变这个频道。当我将数据发送到代理通道之一时,我收到此错误:

2013 年 4 月 29 日晚上 10:05:10 BackendListenHandler exceptionCaught 警告:来自下游的意外异常。java.lang.IllegalStateException:从 eventLoop 外部调用的 nextOutboundByteBuffer()

@Sharable
public class BackendListenHandler extends ChannelInboundByteHandlerAdapter {

private Channel outboundChannel;

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.read();
    ctx.flush();
}

@Override
public void inboundBufferUpdated(final ChannelHandlerContext ctx, ByteBuf in) throws Exception {

    outboundChannel = Proxy.connectionTable.frontendListenChannel;

    if (!outboundChannel.isActive()) {
        System.out.println("channel id=" + outboundChannel.id() + " is NOT active...");
    } else if (outboundChannel.isActive()) {
        ByteBuf out = outboundChannel.outboundByteBuffer();
        out.writeBytes(in);
        outboundChannel.flush().addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    // was able to flush out data, start to read the next chunk
                    ctx.channel().read();
                } else {
                    future.channel().close();
                }
            }
        });
    }
}

public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    if (outboundChannel != null) {
        closeOnFlush(outboundChannel);
    }
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    logger.log(
        Level.WARNING,
        "Unexpected exception from downstream.", cause);
    ctx.close();
}

static void closeOnFlush(Channel ch) {
    if (ch.isActive()) {
        ch.flush().addListener(ChannelFutureListener.CLOSE);
    }
}
}

为了测试,我在这个静态变量中保留和更改我的代理频道:

Proxy.connectionTable.frontendListenChannel;
4

2 回答 2

0

在为“出站”连接创建引导程序时使用相同的 EventLoop,如 HexDump 示例中所示。这将确保一切都从相同的 IO 线程处理。见[1]。

[1] https://github.com/netty/netty/blob/master/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java#L46

于 2013-04-30T14:06:27.180 回答
0

而不是使用“outboundChannel.outboundByteBuffer()” - 我使用“Unpooled.copiedBuffer()”从另一个 EventLoop 将字节写入通道。

@Override
public void inboundBufferUpdated(final ChannelHandlerContext ctx, ByteBuf in) throws   Exception {
outboundChannel = null;
if (Proxy.connectionTable.channelId != 0) {
    outboundChannel = Proxy.allChannels.find(Proxy.connectionTable.channelId);
    if (outboundChannel.isActive()) {               
        System.out.println("NOTIFY channel id=" + outboundChannel.id() + " is active...");
        Rewrite rewrite = new Rewrite(byteBufConverter.byteBufToString(in), 2);
        in.clear();
        in = byteBufConverter.stringToByteBuf(rewrite.getFixedMessage());   
        outboundChannel.write(in);
        outboundChannel.flush().addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    ctx.channel().read();
                } else {
                    future.channel().close();
                }
            }
        });
    }
}
}

其中“byteBufConverter.stringToByteBuf(rewrite.getFixedMessage())”返回“Unpooled.copiedBuffer(strFixed.getBytes())”

于 2013-05-02T20:18:44.397 回答