I've been out of the Java game for ~8 years and a lot has changed since then. The biggest challenge for me has been RxJava / reactive. I'm looking for rough guidance on how to do the equivalent below in a fully reactive way.
The basic requirement, implemented below using ThreadPoolExecutor, is to processing a large amount of Stuff
by calling a remote web service, which has a documented rate limit of 100 requests/minute. My goal is to process as much as possible as fast as possible, without dropping any Stuff
but still honoring the downstream rate limit. This code has been simplified to avoid errors, bulkheads, circuit breakers, retry logic, etc.
This code currently works fine but it results in what feels like a lot of wasted threads given all the non-blocking reactive options. Even the HTTP client I'm using to call my service offers back a Flowable
, which I'm simply blocking on in each of the executor's 20 threads.
I'd love to understand what the reactive equivalent should be. Where I've struggled is almost all the docs I find showcase using static sources for the Observable (ex: Observable.fromArray(1,2,3,4,5)
). I know the solution likely involves IoScheduler
and maybe groupBy
, but I have yet to figure out how to merge the Flowable
s coming from my HTTP client into some complete chain that does parallelization (up to a limit, such as 20) and rate limiting.
public class Example {
private static final int THREADS = 20;
// using https://docs.micronaut.io/latest/guide/index.html#httpClient
@Client("http://stuff-processor.internal:8080")
@Inject
RxHttpClient httpClient;
private ThreadPoolExecutor executor;
private final RateLimiter rateLimiter;
public Example() {
// up to 20 threads to process the unbounded queue
// incoming Stuff is very bursty...
// ...we could go hours without anything and then hundreds could come in
this.executor = new ThreadPoolExecutor(THREADS, THREADS,
30,TimeUnit.SECONDS, new LinkedBlockingQueue<>());
this.executor.allowCoreThreadTimeOut(true);
// using https://resilience4j.readme.io/docs/ratelimiter
RateLimiterConfig config = RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(60))
.limitForPeriod(100)
.timeoutDuration(Duration.ofSeconds(90))
.build();
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);
rateLimiter = rateLimiterRegistry.rateLimiter("stuff-processor", config);
}
/**
* Called when the user takes an action that can cause 1 or 1000s of new
* Stuff to be entered into the system. Each instance of Stuff results in
* a separate call to this method. Ex: 100 Stuffs = 100 calls.
*/
void onNewStuff(Stuff stuff) {
final Runnable task = () -> {
final Flowable<HttpResponse<Boolean>> flowable = httpClient.exchange(
HttpRequest.POST("/process", stuff),
Boolean.class);
final HttpResponse<Boolean> response = flowable.blockingFirst();
if (response.body()) {
System.out.println("Success!");
} else {
System.out.println("Fail :(");
}
};
final Runnable rateLimitedTask =
RateLimiter.decorateRunnable(rateLimiter, task);
executor.submit(rateLimitedTask);
}
}
Thank you!