256

In RxJava there are 5 different schedulers to choose from:

  1. immediate(): Creates and returns a Scheduler that executes work immediately on the current thread.

  2. trampoline(): Creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes.

  3. newThread(): Creates and returns a Scheduler that creates a new Thread for each unit of work.

  4. computation(): Creates and returns a Scheduler intended for computational work. This can be used for event-loops, processing callbacks and other computational work. Do not perform IO-bound work on this scheduler. Use Schedulers.io() instead.

  5. io(): Creates and returns a Scheduler intended for IO-bound work. The implementation is backed by an Executor thread-pool that will grow as needed. This can be used for asynchronously performing blocking IO. Do not perform computational work on this scheduler. Use Schedulers.computation() instead.

Questions:

The first 3 schedulers are pretty self explanatory; however, I'm a little confused about computation and io.

  1. What exactly is "IO-bound work"? Is it used for dealing with streams (java.io) and files (java.nio.files)? Is it used for database queries? Is it used for downloading files or accessing REST APIs?
  2. How is computation() different from newThread()? Is it that all computation() calls are on a single (background) thread instead of a new (background) thread each time?
  3. Why is it bad to call computation() when doing IO work?
  4. Why is it bad to call io() when doing computational work?
4

3 回答 3

340

Great questions, I think the documentation could do with some more detail.

  1. io() is backed by an unbounded thread-pool and is the sort of thing you'd use for non-computationally intensive tasks, that is stuff that doesn't put much load on the CPU. So yep interaction with the file system, interaction with databases or services on a different host are good examples.
  2. computation() is backed by a bounded thread-pool with size equal to the number of available processors. If you tried to schedule CPU intensive work in parallel across more than the available processors (say using newThread()) then you are up for thread creation overhead and context switching overhead as threads vie for a processor and it's potentially a big performance hit.
  3. It's best to leave computation() for CPU intensive work only otherwise you won't get good CPU utilization.
  4. It's bad to call io() for computational work for the reason discussed in 2. io() is unbounded and if you schedule a thousand computational tasks on io() in parallel then each of those thousand tasks will each have their own thread and be competing for CPU incurring context switching costs.
于 2015-07-08T02:58:42.537 回答
5

The most important point is that both Schedulers.io and Schedulers.computation are backed by unbounded thread pools as opposed to the others mentioned in the question. This characteristic is only shared by the Schedulers.from(Executor) in the case the Executor is created with newCachedThreadPool (unbounded with an auto-reclaim thread pool).

As abundantly explained in previous responses and multiple articles on the web, Schedulers.io and Schedulers.computation shall be used carefully as they are optimized for the type of work in their name. But, to my point of view, they're most important role is to provide real concurrency to reactive streams.

Contrary to newcomers belief, reactive streams are not inherently concurrent but inherently asynchronous and sequential. For this very reason, Schedulers.io shall be used only when the I/O operation is blocking (eg: using a blocking command such as Apache IOUtils FileUtils.readFileAsString(...)) thus would freeze the calling thread until the operation is done.

Using an asynchronous method such as Java AsynchronousFileChannel(...) wouldn't block the calling thread during the operation so there is no point in using a separate thread. In fact, Schedulers.io threads are not really a good fit for asynchronous operations as they don't run an event loop and the callback would never... be called.

The same logic applies for database access or remote API calls. Don't use the Schedulers.io if you can use an asynchronous or reactive API to make the call.

Back to concurrency. You may not have access to an async or reactive API to do I/O operations asynchronously or concurrently, so your only alternative is to dispatch multiple calls on a separate thread. Alas, Reactive streams are sequential at their ends but the good news is that the flatMap() operator can introduce concurrency at their core.

Concurrency must be built in the stream construct, typically using the flatMap() operator. This powerful operator can be configured to internally provide a multi-threaded context to your flatMap() embedded Function<T, R>. That context is provided by a multi-threaded Scheduler such as Scheduler.io or Scheduler.computation.

Find more details in articles on RxJava2 Schedulers and Concurrency where you'll find code sample and detailed explanations on how to use Schedulers sequentially and concurrently.

Hope this helps,

Softjake

于 2018-07-19T12:46:16.803 回答
2

This blog post provides an excellent answer

From the blog post:

Schedulers.io() is backed by an unbounded thread pool. It is used for non CPU-intensive I/O type work including interaction with the file system, performing network calls, database interactions, etc. This thread pool is intended to be used for asynchronously performing blocking IO.

Schedulers.computation() is backed by a bounded thread pool with size up to the number of available processors. It is used for computational or CPU-intensive work such as resizing images, processing large data sets, etc. Be careful: when you allocate more computation threads than available cores, performance will degrade due to context switching and thread creation overhead as threads vie for processors’ time.

Schedulers.newThread() creates a new thread for each unit of work scheduled. This scheduler is expensive as new thread is spawned every time and no reuse happens.

Schedulers.from(Executor executor) creates and returns a custom scheduler backed by the specified executor. To limit the number of simultaneous threads in the thread pool, use Scheduler.from(Executors.newFixedThreadPool(n)). This guarantees that if a task is scheduled when all threads are occupied, it will be queued. The threads in the pool will exist until it is explicitly shutdown.

Main thread or AndroidSchedulers.mainThread() is provided by the RxAndroid extension library to RxJava. Main thread (also known as UI thread) is where user interaction happens. Care should be taken not to overload this thread to prevent janky non-responsive UI or, worse, Application Not Responding” (ANR) dialog.

Schedulers.single() is new in RxJava 2. This scheduler is backed by a single thread executing tasks sequentially in the order requested.

Schedulers.trampoline() executes tasks in a FIFO (First In, First Out) manner by one of the participating worker threads. It’s often used when implementing recursion to avoid growing the call stack.

于 2019-02-25T06:00:57.427 回答