As backpressure is based on the slowness of the consumer, one way to achieve this is to convert certain exception types to delay. You can use the onErrorResume
for this purpose as demonstrated below:
long start = System.currentTimeMillis();
Flux.range(1, 1000)
.doOnNext(item -> System.out.println("Elpased " + (System.currentTimeMillis() - start) + " millis for item: " + item))
.flatMap(item -> process(item).onErrorResume(this::slowDown), 5) // concurrency limit for demo
.blockLast();
System.out.println("Flow took " + (System.currentTimeMillis() - start) + " milliseconds.");
private Mono<Integer> process(Integer item) {
// simulate error for some items
if (item >= 50 && item <= 100) {
return Mono.error(new RuntimeException("Downstream failed."));
}
// normal processing
return Mono.delay(Duration.ofMillis(10))
.thenReturn(item);
}
private Mono<Integer> slowDown(Throwable e) {
if (e instanceof RuntimeException) { // you could check for circuit breaker exception
return Mono.delay(Duration.ofMillis(1000)).then(Mono.empty()); // delay to slow down
}
return Mono.empty(); // no delay for other errors
}
If you check the output of this code, you can see there is some slow down between the items 50 and 100 but it works at regular speed before and after.
Note that my example does not use Kafka. As you are using reactor-kafka library which honors backpressure it is supposed to work the same way as this dummy example.
Also, as the Flux might process items concurrently, the slow down is not immediate, it will try to process some additional items before properly slowing down.