0

我们正在使用 Spring Boot 版本1.2.8.RELEASE,它具有org.projectoreactor.*与 Reactor 版本的托管依赖项1.1.6.RELEASE

我面临的问题是在我的自定义编解码器 ( reactor.io.encoding.Codec) 中,给定的缓冲区 ( reactor.io.Buffer) 的上限为 1024 字节,但我的消息超出了该限制。当我尝试解码消息时,它不是完整的消息(只是部分消息)并且我的解码失败,因为它期望完整的消息。

问题一:如何增加Buffer bytes( reactor.io.Buffer) 使我的 apply 函数正常工作?下面的简单示例:

public class StringDecoder implements Function<Buffer, String> {

    // Buffer is limited to 1024 but the message the client sent
    // was 2k
    @Override
    public String apply(Buffer bytes) {
        return bytes.toString();
    }
}

问题二:如何将apply函数(上图)分块?这意味着当 Netty 的缓冲区达到其限制时,我的 apply 函数可以创建自己的缓冲区(并管理缓冲区),最终我可以解码消息并且 Reactory/Netty 可以将其传递给消费者。

注意:在我的“主要”方法中,以下内容用于设置环境。Netty 服务器在 Windows 下运行,客户端在 linux 上。这与 TCP 的 Windows 实现有关吗?

// NOTES: ServerSocketOptions sets the max buffer for send and receive to 
// something much larger than 1024. Verified with debugger
TcpServerSpec<String, String> spec = new TcpServerSpec<String, String>(NettyTcpServer.class);
spec.env(env);
spec.listen(port);
spec.dispatcher("sync");
spec.codec(new AgentCodec());
spec.consume(connectionHandler(handler));

TcpServer<String, String> tcp = spec.get();
tcp.start().await();
4

1 回答 1

0

好的,在调试代码之后,我发现了问题所在——顺便说一句,我在 Reactor 项目 (tisk tisk) 上找到的任何文档中都找不到这个问题。

应用函数将继续被更多消息调用。所以如果消息是 2K,apply 会被调用两次。第一次用1K,第二次用2K。

注意:如果您不推进缓冲区位置,它将附加到缓冲区。如果您确实推进了缓冲区位置,那么当再次调用 apply 时它将被删除。

于 2016-01-07T17:58:10.597 回答