1

I've been out of the Java game for ~8 years and a lot has changed since then. The biggest challenge for me has been RxJava / reactive. I'm looking for rough guidance on how to do the equivalent below in a fully reactive way.

The basic requirement, implemented below using ThreadPoolExecutor, is to processing a large amount of Stuff by calling a remote web service, which has a documented rate limit of 100 requests/minute. My goal is to process as much as possible as fast as possible, without dropping any Stuff but still honoring the downstream rate limit. This code has been simplified to avoid errors, bulkheads, circuit breakers, retry logic, etc.

This code currently works fine but it results in what feels like a lot of wasted threads given all the non-blocking reactive options. Even the HTTP client I'm using to call my service offers back a Flowable, which I'm simply blocking on in each of the executor's 20 threads.

I'd love to understand what the reactive equivalent should be. Where I've struggled is almost all the docs I find showcase using static sources for the Observable (ex: Observable.fromArray(1,2,3,4,5)). I know the solution likely involves IoScheduler and maybe groupBy, but I have yet to figure out how to merge the Flowables coming from my HTTP client into some complete chain that does parallelization (up to a limit, such as 20) and rate limiting.

public class Example {
    private static final int THREADS = 20;

    // using https://docs.micronaut.io/latest/guide/index.html#httpClient
    @Client("http://stuff-processor.internal:8080")
    @Inject
    RxHttpClient httpClient;

    private ThreadPoolExecutor executor;
    private final RateLimiter rateLimiter;

    public Example() {
        // up to 20 threads to process the unbounded queue
        // incoming Stuff is very bursty...
        // ...we could go hours without anything and then hundreds could come in
        this.executor = new ThreadPoolExecutor(THREADS, THREADS,
                30,TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        this.executor.allowCoreThreadTimeOut(true);

        // using https://resilience4j.readme.io/docs/ratelimiter
        RateLimiterConfig config = RateLimiterConfig.custom()
                .limitRefreshPeriod(Duration.ofSeconds(60))
                .limitForPeriod(100)
                .timeoutDuration(Duration.ofSeconds(90))
                .build();
        RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);
        rateLimiter = rateLimiterRegistry.rateLimiter("stuff-processor", config);
    }

    /**
     * Called when the user takes an action that can cause 1 or 1000s of new
     * Stuff to be entered into the system. Each instance of Stuff results in
     * a separate call to this method. Ex: 100 Stuffs = 100 calls.
     */
    void onNewStuff(Stuff stuff) {
        final Runnable task = () -> {
            final Flowable<HttpResponse<Boolean>> flowable = httpClient.exchange(
                    HttpRequest.POST("/process", stuff),
                    Boolean.class);

            final HttpResponse<Boolean> response = flowable.blockingFirst();
            if (response.body()) {
                System.out.println("Success!");
            } else {
                System.out.println("Fail :(");
            }
        };

        final Runnable rateLimitedTask = 
                RateLimiter.decorateRunnable(rateLimiter, task);
        executor.submit(rateLimitedTask);
    }
}

Thank you!

4

1 回答 1

1

首先,要以完全非阻塞的方式构建它,您需要使用像 Netty 这样的非阻塞、异步 HTTP 客户端库。我不确定如何RxHttpClient工作。

假设你有一个 list stuff。我会这样做:

Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io())).subscribe();

flatMap合并响应。

为了限制速率,您flatMap有第二个参数,它限制了它并行订阅的内部流的数量。假设您想一次拨打不超过 10 个电话。做这个:

Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io()), 10).subscribe();
于 2019-10-07T18:30:53.000 回答