1

在 reactor netty 之前,我组装一个 Netty Tcp 服务器的方式是创建服务器引导程序并添加我的自定义管道类。使用 Reactor-Netty 有 TcpServer.create(),但似乎我必须创建一个新的功能接口,该接口采用 NettyInbound 和 NettyOutbound 并返回一个 Mono。但是,如果我想添加一个构建管道的 ChannelInitializer,我必须阻止获取 NettyContext。传入的消息由功能接口接收,我可以发送响应,但没有任何东西通过我的管道。

有没有办法让我们使用 Reactor Netty 并让消息通过定制的管道流动?

在建立连接和收到消息时,使用 neverComplete() 返回 Mono.just("Hi") 成功地向客户端发送“Hi”,但我需要将其卸载到管道然后获取结果馈送回到客户端。

public void startServer() throws InterruptedException{
    EventLoopGroup group = new NioEventLoopGroup(1);
    try {
        final TcpServer server = TcpServer.create(opts -> opts
            .eventLoopGroup(group)
            .listen(tcpSocketAddress));
        server
            .newHandler((in, out) -> {
                in.receive()
                    .take(1)
                    .log(ApolloApplicationTests.class.getName())
                    .subscribe(data -> {
                        log.info("Server Received: {}", data.toString(CharsetUtil.UTF_8));
                       latch.countDown();
                    });
                    return out.sendString(Mono.just("Hi")).neverComplete();
            })
            .block().addHandler(clientEndPoint)
            .channel()
            .closeFuture().sync();

    } finally {
        group.shutdownGracefully().sync();
    }
}



import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.MessageToMessageDecoder;
import reactor.util.Logger;
import reactor.util.Loggers;

@Configurable
@Component
public class ClientEndPoint extends ChannelInitializer<Channel> {

final Logger log = Loggers.getLogger(ApolloApplication.class);

private ChannelPipeline pipeline;

@Autowired
private ChannelHandlerAdapter messageInterchange;

@Autowired
private LengthFieldBasedFrameDecoder lowOrderVliDecoder;

@Autowired
private MessageToMessageDecoder<ByteBuf> messageDecoder;

@Autowired
private LengthFieldPrepender vliEncoder;

@Autowired
@Qualifier("inBound")
List<ChannelHandler> inBoundHandlers;

@Autowired
@Qualifier("outBound")
List<ChannelHandler> outBoundHandlers;

    @Override
    protected void initChannel(Channel sc) throws Exception {
        this.pipeline = sc.pipeline();
        this.pipeline.addLast("lowOrderVliDecoder", this.lowOrderVliDecoder);

        this.pipeline.addLast("messageDecoder", this.messageDecoder);

        this.pipeline.addLast("vliEncoder", this.vliEncoder);

        for (ChannelHandler handler : this.inBoundHandlers) {
            this.pipeline.addLast(handler);
        }

        this.pipeline.addLast("messageInterchange", this.messageInterchange);

        for (ChannelHandler handler : this.outBoundHandlers) {
            this.pipeline.addLast(handler);
        }
    }


    public void accept(Channel sc) {
        this.pipeline = sc.pipeline();
        this.pipeline.addLast("lowOrderVliDecoder", this.lowOrderVliDecoder);

        this.pipeline.addLast("messageDecoder", this.messageDecoder);

        this.pipeline.addLast("vliEncoder", this.vliEncoder);

        for (ChannelHandler handler : this.inBoundHandlers) {
            this.pipeline.addLast(handler);
        }

        this.pipeline.addLast("messageInterchange", this.messageInterchange);

        for (ChannelHandler handler : this.outBoundHandlers) {
            this.pipeline.addLast(handler);
        }
    }
}
4

1 回答 1

3

所以我想通了

public Mono<? extends NettyContext> initializeServer() throws InterruptedException {
    this.log.debug("Server Initializing");
    BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> serverHandler = (in,
            out) -> {
        in.receive().asString().subscribe(data -> {
            this.log.debug("Received " + data + " on " + in);
        });
        return Flux.never();
    };

    TcpServer server = TcpServer.create(opts -> opts.afterChannelInit(pipeline).listen(tcpSocketAddress));
    return server.newHandler(serverHandler);
}

其中 pipeline 是实现 Consumer 并在 accept 方法中构建管道的类,作为典型的 netty 管道。

然后我启动服务器

private void startServer(Mono<? extends NettyContext> connected) {
        ChannelFuture f = connected.block(Duration.ofSeconds(5)).channel()
                .closeFuture();
        final CountDownLatch channelLatch = new CountDownLatch(1);
        f.addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture cf) throws Exception {
                log.debug("Channel Disconnected");

            }

        });
        f.awaitUninterruptibly();
        // Now we are sure the future is completed.
        assert f.isDone();

        if (f.isCancelled()) {
            this.log.warn("Connection Cancelled");
        } else if (!f.isSuccess()) {
            if (f.cause() != null) {
                f.cause().printStackTrace();
            } else {
                this.log.warn("Connection not successful");
            }
        } else {
            channelLatch.countDown();
            this.log.info("Server Start Successful");
        }
        try {
            channelLatch.await();
        } catch (InterruptedException ex) {
            throw new CancellationException("Interrupted while waiting for streaming " + "connection to arrive.");
        }
}
于 2017-05-30T08:16:06.503 回答