0

我正在学习 akka 流,但显然它与任何流框架相关 :)

引用akka文档:

Reactive Streams 只是定义了一种通用机制,用于在不丢失、缓冲或资源耗尽的情况下跨异步边界移动数据

现在,据我了解,如果直到流之前,让我们以 http 服务器为例,请求将会到来,并且当接收者完成一个请求时,因此即将到来的新请求将被收集在一个缓冲区中将保留等待的请求,然后存在这个缓冲区大小未知的问题,如果服务器过载,我们可能会丢失正在等待的请求。

因此,流处理开始发挥作用,他们将此缓冲区限制为可控......所以我们可以预定义我们想要排队的消息(在我的示例中为请求)的数量,并且我们可以一次处理每个消息。

我的问题,如果我们在我们的服务器中实现一个源最多可以有 3 条消息,那么如果第 4 个 id 来了会发生什么?

我的意思是当另一台服务器打电话给我们并且我们已经处理了 3 个请求时……他的请求会发生什么?

4

1 回答 1

2

您所描述的实际上并不是 Reactive Streams 实现解决的主要问题。

使用常规网络工具可以解决请求数量方面的背压。例如,在 Java 中,您可以将网络库(例如 Netty)的线程池配置为某种并行级别,并且该库将负责接受尽可能多的请求。或者,如果您使用同步套接字 API,则更简单 - 您可以推迟accept()对服务器套接字的调用,直到所有当前连接的客户端都得到服务。在任何一种情况下,任何一方都没有“缓冲区”,只有在服务器接受连接之前,客户端才会被阻塞(在阻塞 API 的系统调用中,或者在异步 API 的事件循环中)。

Reactive Streams 实现解决的是如何在更高级别的数据管道中处理背压。反应式流实现(例如akka-streams)提供了一种构建数据管道的方法,当数据的消费者速度很慢时,生产者也会自动减速,这将适用于任何类型的底层传输,无论是 HTTP、WebSockets、原始 TCP 连接还是进程内消息传递。

例如,考虑一个简单的 WebSocket 连接,客户端发送一个连续的信息流(例如来自某个传感器的数据),而服务器将该数据写入某个数据库。现在假设服务器端的数据库由于某种原因(网络问题、磁盘过载等)变慢了。服务器现在跟不上客户端发送的数据,也就是无法在新的数据到来之前及时保存到数据库中。如果您在整个管道中使用反应式流实现,服务器将自动向客户端发出信号,表明它无法处理更多数据,并且客户端将自动调整其生产速率,以免服务器过载。

当然,这可以在没有任何 Reactive Streams 实现的情况下完成,例如通过手动控制确认。但是,与许多其他库一样,Reactive Streams 实现为您解决了这个问题。它们还提供了一种定义此类管道的简单方法,并且通常它们具有用于各种外部系统(如数据库)的接口。特别是,此类库可以在最低级别实现背压,直至 TCP 连接,这可能很难手动完成。

至于 Reactive Streams 本身,它只是对可以由库实现的 API 的描述,它定义了常用术语和行为,并允许此类库可互换或轻松交互,例如,您可以将 akka-streams 管道连接到使用规范中的接口的 Monix 管道,组合管道将无缝工作并支持 Reacive Streams 的所有背压特性。

于 2017-06-25T23:43:20.013 回答