我在 java 中有一个示例 protobuf 项目,它将 protobuf 请求消息发送到服务器并使用 netty 通道处理程序获取响应,类似于:
public NettyClient(String host, int port) {
this.host = host;
this.port = port;
this.msgFactory = new OA2ProtoMessageFactory();
this.protoChannelMessageDecoder = new ProtoMessageToChannelMessageDecoder(msgFactory);
this.protoChannelMessageEncoder = new ChannelMessageToProtoMessageEncoder(msgFactory);
this.protoMessageReceiverHandler = new ProtoMessageReceiverHandler(msgFactory);
connect();
}
public void connect() {
workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
initPipelineForChannel(ch);
}
});
// Start the client.
channelFuture = b.connect(host, port).sync();
} catch (Exception ex) {
closeConnection();
}
}
private void initPipelineForChannel(Channel ch) throws SSLException {
ChannelPipeline pipeline = ch.pipeline();
SslEngineFactory sslEngineFactory = new ClientSslEngineFactory();
pipeline.addLast("ssl", sslEngineFactory.newHandler(ch));
pipeline.addLast("idleState", new IdleStateHandler(INACTIVITY_READ_MILLIS,
PING_INTERVAL_MILLIS, 0, TimeUnit.MILLISECONDS));
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, LENGTH_FIELD_LENGTH, 0,
LENGTH_FIELD_LENGTH));
pipeline.addLast("protobufDecoder", protobufDecoder);
pipeline.addLast("protoChannelMessageDecoder", protoChannelMessageDecoder);
pipeline.addLast("lengthFieldPrepender", lengthFieldPrepender);
pipeline.addLast("protobufEncoder", protobufEncoder);
pipeline.addLast("protoChannelMessageEncoder", protoChannelMessageEncoder);
pipeline.addLast("heartbeatOnIdle", HeartbeatOnIdleHandler.DEFAULT);
pipeline.addLast(ProtoMessageReceiverHandler.NAME, protoMessageReceiverHandler);
pipeline.addLast("closeOnException", CloseOnExceptionHandler.DEFAULT);
}
因此,如果我理解正确的话,这基本上会创建一个 netty Bootstrap 并在创建通道时添加一个管道。
如何在 Akka grpc(解码器等)中设置此管道?