Home > Software design >  Reactor backpressure - request multiple in hookOnNext
Reactor backpressure - request multiple in hookOnNext

Time:10-05

I've been testing some of the Reactor backpressure stuff, and one common structure seems to be:

package com.example.backpressure;

import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscription;
import org.springframework.boot.test.context.SpringBootTest;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;

@SpringBootTest
class BackPressureApplicationTests {

    @Test
    public void backPressureApplicationTest1() {
        Flux<Integer> request = Flux.range(1, 20).log();
        request.subscribe(new BackPressureSubscriber<>());
    }

}
class BackPressureSubscriber<T> extends BaseSubscriber<T> {

    public void hookOnSubscribe(Subscription subscription) {
        request(1);
    }

    public void hookOnNext(T value) {
        System.out.println("Value is: "   value);
        request(1);
    }
}

The idea here is to apply backpressure by just allowing a new value to be received after having processed the current one (request(1)). It seems to work fine - and I am seeing output like this:

2022-10-03 20:11:51.908  INFO 2380 --- [           main] reactor.Flux.Range.1                     : | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2022-10-03 20:11:51.909  INFO 2380 --- [           main] reactor.Flux.Range.1                     : | request(1)
2022-10-03 20:11:51.910  INFO 2380 --- [           main] reactor.Flux.Range.1                     : | onNext(1)
Value is: 1
2022-10-03 20:11:51.911  INFO 2380 --- [           main] reactor.Flux.Range.1                     : | request(1)
2022-10-03 20:11:51.911  INFO 2380 --- [           main] reactor.Flux.Range.1                     : | onNext(2)
Value is: 2
2022-10-03 20:11:51.911  INFO 2380 --- [           main] reactor.Flux.Range.1                     : | request(1)
2022-10-03 20:11:51.911  INFO 2380 --- [           main] reactor.Flux.Range.1                     : | onNext(3)
Value is: 3
2022-10-03 20:11:51.911  INFO 2380 --- [           main] reactor.Flux.Range.1                     : | request(1)
2022-10-03 20:11:51.911  INFO 2380 --- [           main] reactor.Flux.Range.1                     : | onNext(4)
Value is: 4

But what would it mean if I e.g. user request(3) in the hookOnNext?

public void hookOnNext(T value) {
    System.out.println("Value is: "   value);
    request(3);
}

When examining the output - it seems that the structure is identical (expect for 1=>3):

2022-10-03 20:10:37.701  INFO 19484 --- [           main] reactor.Flux.Range.1                     : | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2022-10-03 20:10:37.703  INFO 19484 --- [           main] reactor.Flux.Range.1                     : | request(1)
2022-10-03 20:10:37.704  INFO 19484 --- [           main] reactor.Flux.Range.1                     : | onNext(1)
Value is: 1
2022-10-03 20:10:37.704  INFO 19484 --- [           main] reactor.Flux.Range.1                     : | request(3)
2022-10-03 20:10:37.704  INFO 19484 --- [           main] reactor.Flux.Range.1                     : | onNext(2)
Value is: 2
2022-10-03 20:10:37.704  INFO 19484 --- [           main] reactor.Flux.Range.1                     : | request(3)
2022-10-03 20:10:37.704  INFO 19484 --- [           main] reactor.Flux.Range.1                     : | onNext(3)
Value is: 3
2022-10-03 20:10:37.704  INFO 19484 --- [           main] reactor.Flux.Range.1                     : | request(3)
2022-10-03 20:10:37.704  INFO 19484 --- [           main] reactor.Flux.Range.1                     : | onNext(4)
Value is: 4

Question 1: If I understand the code correctly, it will - for each value received - tell the producer that it can send 3 more values. But will then this "quota" be aggregated somehow for the producer - since I request more values than I process? Does it really make sense at all to request more than 1 value in the hookOnNext handler?

I have seen people do things like this:

class BackPressureSubscriber<T> extends BaseSubscriber<T> {

    int cnt = 0;
    int numEachTime=3;

    public void hookOnSubscribe(Subscription subscription) {
        request(3);
    }

    public void hookOnNext(T value) {
        System.out.println("Value is: "   value);

        cnt  ;
        if(cnt%numEachTime == 0) {
            request(3);
        }
    }
}

Which then gives output:

2022-10-03 20:21:59.681  INFO 11836 --- [           main] reactor.Flux.Range.1                     : | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2022-10-03 20:21:59.683  INFO 11836 --- [           main] reactor.Flux.Range.1                     : | request(3)
2022-10-03 20:21:59.683  INFO 11836 --- [           main] reactor.Flux.Range.1                     : | onNext(1)
Value is: 1
2022-10-03 20:21:59.683  INFO 11836 --- [           main] reactor.Flux.Range.1                     : | onNext(2)
Value is: 2
2022-10-03 20:21:59.683  INFO 11836 --- [           main] reactor.Flux.Range.1                     : | onNext(3)
Value is: 3
2022-10-03 20:21:59.683  INFO 11836 --- [           main] reactor.Flux.Range.1                     : | request(3)
2022-10-03 20:21:59.683  INFO 11836 --- [           main] reactor.Flux.Range.1                     : | onNext(4)
Value is: 4
2022-10-03 20:21:59.683  INFO 11836 --- [           main] reactor.Flux.Range.1                     : | onNext(5)
Value is: 5
2022-10-03 20:21:59.683  INFO 11836 --- [           main] reactor.Flux.Range.1                     : | onNext(6)
Value is: 6

Question 2: This processes 3 values before it requests 3 new values, but does this really give an advantage over requesting 1-by-1? Maybe some less latency due to less number of request calls?

CodePudding user response:

Calling request(n) from within a Subscriber S tells a Publisher P that is is allowed to call the onNext method of S at most n times. Calling request(n) again does not override a previous request: it tells P that is allowed to produce another n elements (i.e. 2*n in total).

If n is equal to 1, P will call onNext at most once and wait until more elements are requested by S. This results in stop-and-wait which is usually not what you want. Subscribers should, therefore, request the maximum number of elements they are able to process. In fact, if S can process elements faster than P can procude them, S might request a unbounded number of elements by calling request(Long.MAX_VALUE) (ideally from its onSubscribe method).

So, to answer your questions:

Yes, it makes sense to request multiple elements in order to reduce delays between S and P. While it may not make sense to call request(3) unconditionally from within your onNext method, image you have a buffer of, let's say, 100 elements. If you request 100 elements at the beginning (i.e. onSubscribe) and if S processes the elements slower than P can produce them, your buffer might fill up to some number of elements. Then, let's say, that after some processing the remaining elements in the buffer reach 10. At this point you might request another 90: with a strategy like this your S might never have to wait for P to generate elements.

Fur further details, you might want to have a look at the specification from which much of this information here is taken from.

  • Related