0

我正在为我的应用程序使用protobuf-rpc-pro(两个系统集成)。protobuf-rpc-pro 基于 Netty,使用了 Netty 的这个依赖:
 <dependency>
     <groupId>io.netty</groupId>
     <artifactId>netty-all</artifactId>
     <version>4.0.0.CR1</version >
 </依赖>

我需要为两个系统之间的通信实现通道池以获得高性能。即在我的实现中,我希望获得多个连接,这些连接将并行发送消息(非阻塞通信)。这里的问题是如何检查发送缓冲区是否已满并切换到另一个连接(如负载平衡)。Netty 或某些外部实现中是否有任何机制?在网上没有找到任何东西...

4

1 回答 1

2

Channel#isWritable()方法了吗?据我了解,如果写缓冲区太满,那将返回 false。

编辑:添加了使用 Channel#isWritebale() 和 highWaterMark/lowWaterMark 的简单演示。

static InetSocketAddress ADDRESS = new InetSocketAddress("localhost", 4711);

static ChannelPipeline createPipeline() {

    ChannelPipeline pipeline = Channels.pipeline();
    pipeline.addLast("frameDecoder", new DelimiterBasedFrameDecoder(100,
            Delimiters.lineDelimiter()));
    pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));

    pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
    return pipeline;

}

public static void main(String[] args) throws Throwable {
    int suspendedWrites = 0;
    int N = 1000;
    final CountDownLatch ready = new CountDownLatch(N);

    ConnectionlessBootstrap server = new ConnectionlessBootstrap(
            new NioDatagramChannelFactory());
    server.setPipeline(createPipeline());
    server.getPipeline().addLast("printer",
            new SimpleChannelUpstreamHandler() {
                @Override
                public void messageReceived(ChannelHandlerContext ctx,
                        MessageEvent e) throws Exception {
                    System.out.println((String) e.getMessage());
                    ready.countDown();
                }
            });
    server.bind(ADDRESS);

    ClientBootstrap client = new ClientBootstrap(
            new NioDatagramChannelFactory());
    client.setPipeline(createPipeline());
    Channel clientChannel = client.connect(ADDRESS).sync().getChannel();

    NioDatagramChannelConfig config = (NioDatagramChannelConfig) clientChannel
            .getConfig();

    config.setWriteBufferLowWaterMark(500);
    config.setWriteBufferHighWaterMark(1000);

    for (int i = 0; i < N; ++i) {
        String message = "Hello number " + (i + 1) + " from client\n";
        if (clientChannel.isWritable())
            clientChannel.write(message);
        else {
            clientChannel.write(message).await();
            ++suspendedWrites;
        }
    }

    ready.await(1, TimeUnit.SECONDS);
    client.releaseExternalResources();
    client.shutdown();
    server.releaseExternalResources();
    server.shutdown();
    System.out.println("Suspended writes: " + suspendedWrites);
    System.out.println("Missed reads: " + ready.getCount());
}

运行这将显示 Channel#isWriteable() 取决于当前的写入缓冲区大小以及高水位线和低水位线的值。

更详细的实现可以监控兴趣操作的变化,并在此基础上使可写通道可用。

于 2013-06-26T12:30:39.583 回答