5

我再次将 RxJava 与 Java 9 Flow 进行比较。我看到 Flow 默认情况下是异步的,我想知道是否有办法让它同步运行。

有时我们只是想将它不是用于 Nio 而是用于糖语法,并拥有更同质的代码。

在 RxJava 中,默认情况下它是同步的,您可以使用observerOnsubscribeOn在您的管道中使其异步运行。

Flow中是否有任何运算符可以使其在主线程中运行?

问候。

4

3 回答 3

4

Publisher您可以按照文档中的说明定义自定义Flow以使用同步执行。

一个非常简单的发布者,仅向单个订阅者发布(在请求时)单个 TRUE 项目。因为订阅者只接收一个项目,所以这个类不使用缓冲和排序控制。

class OneShotPublisher implements Publisher<Boolean> {
   private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
   private boolean subscribed; // true after first subscribe
   public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
     if (subscribed)
       subscriber.onError(new IllegalStateException()); // only one allowed
     else {
       subscribed = true;
       subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
     }
   }
   static class OneShotSubscription implements Subscription {
     private final Subscriber<? super Boolean> subscriber;
     private final ExecutorService executor;
     private Future<?> future; // to allow cancellation
     private boolean completed;
     OneShotSubscription(Subscriber<? super Boolean> subscriber,
                         ExecutorService executor) {
       this.subscriber = subscriber;
       this.executor = executor;
     }
     public synchronized void request(long n) {
       if (n != 0 && !completed) {
         completed = true;
         if (n < 0) {
           IllegalArgumentException ex = new IllegalArgumentException();
           executor.execute(() -> subscriber.onError(ex));
         } else {
           future = executor.submit(() -> {
             subscriber.onNext(Boolean.TRUE);
             subscriber.onComplete();
           });
         }
       }
     }
     public synchronized void cancel() {
       completed = true;
       if (future != null) future.cancel(false);
     }
   }
 }
于 2017-09-30T19:19:27.630 回答
3

没有操作员可以这样做,但 API 允许您控制项目的发布方式。因此,您可以直接从当前线程调用订阅者方法。

class SynchronousPublisher implements Publisher<Data>  {
      public synchronized void subscribe(Subscriber<? super Data> subscriber) {
           subscriber.onSubscribe(new SynchronousSubscription(subscriber));
      }
 }
 static class SynchronousSubscription implements Subscription {
      private final Subscriber<? super Data> subscriber;

       SynchronousSubscription(Subscriber<? super Data> subscriber) {
          this.subscriber = subscriber;
       }
       public synchronized void request(long n) {
           ... // prepare item            
           subscriber.onNext(someItem);      
       }

       ...
   }
}
于 2017-09-30T19:30:01.320 回答
2

这取决于您在主线程上运行的意思。

如果您想强制任意 Flow 在特定线程上执行,则没有标准的方法来执行此操作,除非 Flow 在库中实现,让您覆盖提供异步的部分。在 RxJava 术语中,这些是实用程序类Scheduler提供的 s 。Schedulers

如果要在主线程上观察 Flow,则必须在其之上编写一个阻塞队列使用者,该使用者Flow.Subscriber会阻塞线程,直到队列有项目为止。这可能会变得复杂,因此我将向您推荐 Reactive4JavaFlowblockingSubscribe中的实现。

如果要将Java主线程用作Executor/ Scheduler,那就更复杂了,需要类似的阻塞机制以及线程池执行器的一些想法。Reactive4JavaFlow恰好有这样一个调度程序,您可以通过以下方式将其用作执行程序:new SubmissionPublisher<>(128, blockingScheduler::schedule)

于 2017-09-30T19:51:54.553 回答