4

我正在使用 Spring 对 RSocket 的支持,特别是请求流模型。IE:

@MessageMapping("stream")
Flux<SubscriptionMessage> stream(final SubscriptionMessage request, @AuthenticationPrincipal UserDetails user) {
    //....
}

如果我正确理解 RSocket,Flux 响应将作为 request(n) 约定下的一系列有效负载消息传递回客户端。例如,一次有 n 条有效载荷消息。在每一系列消息之后,客户端使用 REQUEST_N 消息向服务器发送一个附加集,这为背压缓解提供了基础。

在java库的API(org.springframework.messaging.rsocket,它基于io.rsocket.RSocket)中,有没有办法在REQUEST_N消息到达时处理/访问它们,或者显式设置N的值通过策略(并查看请求者传递给服务器的值)?

原因/原因:我正在 Kafka 上实现一个 rsocket 外观,并尝试为订阅提供一个请求流机制,该机制将在消息被消费时支持自动偏移提交。对于请求流约定,我认为请求者偶尔的 REQUEST_N 交互是可以推进主题提交偏移量的理想点,因为它由请求者传输意味着响应者发送的前面的 Payload 消息已经已收到。

我见过的唯一其他选择是使用请求通道模型,以便请求者可以发送和初始订阅请求,并开始接收数据,还可以发送周期消息来专门控制同一通道上的提交偏移量。无论如何,我正在考虑提供它,但想知道是否有办法将逻辑注入到流的周期性请求(n)周期中。

4

1 回答 1

0

反应器的默认值曾经是无限的或取消。您可以使用诸如 take 或 limitRate 之类的内置运算符来自定义请求 n 行为。

https://github.com/making/rsc/blob/50d0c3dc43c60d3b3fe42e5586df49adb016a9cb/src/main/java/am/ik/rsocket/InteractionModel.java#L41-L52

或者实现你自己的操作符,或者使用自定义订阅/订阅逻辑来精确控制。这涉及更多,因此请先尝试一下并展示您正在尝试做的事情的示例。

请参阅https://projectreactor.io/docs/core/release/reference/#_on_backpressure_and_ways_to_reshape_requests等文档

于 2020-11-29T13:42:52.733 回答