我正在使用 Spring 对 RSocket 的支持,特别是请求流模型。IE:
@MessageMapping("stream")
Flux<SubscriptionMessage> stream(final SubscriptionMessage request, @AuthenticationPrincipal UserDetails user) {
//....
}
如果我正确理解 RSocket,Flux 响应将作为 request(n) 约定下的一系列有效负载消息传递回客户端。例如,一次有 n 条有效载荷消息。在每一系列消息之后,客户端使用 REQUEST_N 消息向服务器发送一个附加集,这为背压缓解提供了基础。
在java库的API(org.springframework.messaging.rsocket,它基于io.rsocket.RSocket)中,有没有办法在REQUEST_N消息到达时处理/访问它们,或者显式设置N的值通过策略(并查看请求者传递给服务器的值)?
原因/原因:我正在 Kafka 上实现一个 rsocket 外观,并尝试为订阅提供一个请求流机制,该机制将在消息被消费时支持自动偏移提交。对于请求流约定,我认为请求者偶尔的 REQUEST_N 交互是可以推进主题提交偏移量的理想点,因为它由请求者传输意味着响应者发送的前面的 Payload 消息已经已收到。
我见过的唯一其他选择是使用请求通道模型,以便请求者可以发送和初始订阅请求,并开始接收数据,还可以发送周期消息来专门控制同一通道上的提交偏移量。无论如何,我正在考虑提供它,但想知道是否有办法将逻辑注入到流的周期性请求(n)周期中。