它们之间有什么相同点和不同点,看起来 Java Parallel Stream 具有 RXJava 中可用的一些元素,对吗?
3 回答
Rx is an API for creating and processing observable sequences. The Streams API is for processing iterable sequences. Rx sequences are push-based; you are notified when an element is available. A Stream is pull-based; it "asks" for items to process. They may appear similar because they both support similar operators/transforms, but the mechanics are essentially opposites of each other.
流是基于拉的。我个人觉得这是 Oracle 对 C# IEnumerable<>、LINQ 及其相关扩展方法的回答。
RxJava 是基于推送的,我不确定是 .NET 的响应式扩展首先发布还是 Rx 项目首先上线。
从概念上讲,它们是完全不同的,它们的应用也不同。
如果您在一个文本文件上实现一个文本搜索程序,该文件太大以至于您无法加载所有内容并适合内存,您可能希望使用 Stream,因为您可以通过跟踪您轻松确定是否有下一行可用迭代器,并逐行扫描。
Stream 的另一个应用是对数据集合的并行计算。现在每台机器都有多个内核,但您不会轻易知道您的客户端机器有多少内核可用。很难预先配置要操作的线程数。所以我们使用并行流并让JVM为我们确定(应该更优化)。
另一方面,如果您正在实现一个接受用户输入字符串并在网络上搜索可用视频的程序,您将使用 RX,因为您甚至不知道该程序何时开始获得任何结果(或收到错误网络超时)。为了使您的程序响应,您必须让程序“订阅”网络更新和完整信号。
Rx 的另一个常见应用是在 GUI 上“检测用户完成的输入”,而无需用户单击按钮进行确认。例如,您希望在用户停止输入时有一个文本字段,您无需等待“搜索按钮”单击即可开始搜索。在这种情况下,您使用 Rx 在“KeyEvent”和“throttle”(例如 500 毫秒)上创建一个 observable,这样每当他停止输入 500 毫秒时,您就会收到一个 onNext() 来“开始搜索”。
线程也有区别。
Stream#parallel 将序列拆分为多个部分,每个部分在单独的线程中处理。
Observable#subscribeOn 和 Observable#observeOn 都是“移动”执行到另一个线程,但不要拆分序列。
换句话说,对于任何特定的处理阶段:
- 并行流可以在不同的线程上处理不同的元素
- Observable 将使用一个线程进行舞台
例如。我们有许多元素的 Observable/Stream 和两个处理阶段:
Observable.create(...)
.observeOn(Schedulers.io())
.map(x -> stage1(x))
.observeOn(Schedulers.io())
.map(y -> stage2(y))
.forEach(...);
Stream.generate(...)
.parallel()
.map(x -> stage1(x))
.map(y -> stage2(y))
.forEach(...);
Observable 将使用不超过 2 个额外线程(每个阶段一个),因此不同线程不会访问两个 x'es 或 y's。在国内,Stream 可能跨越多个线程的每个阶段。