I'm pretty new in project-reactor and I need some clarification about events sequencing.
Example 1 (Flux with 2 elements and buffer size == 2) :
Flux.fromStream(IntStream.range(0, 2).boxed()) // Two element in the stream
.doOnSubscribe(subscription -> out.println("OnSubscribe"))
.doOnRequest(l -> out.println("OnRequest"))
.doOnNext(subscription -> out.println("onNext"))
.doOnEach(subscription -> out.println("onEach"))
.doOnComplete(() -> out.println("OnComplete"))
.doOnTerminate(() -> out.println("onTerminate"))
.doAfterTerminate(() -> out.println("doAfterTerminate"))
.doFinally(signalType -> out.println("doFinally"))
.doOnCancel(() -> out.println("onCancel"))
.doOnError(throwable -> out.println("onError"))
.buffer(2) // Buffer size 2
.subscribe(integers -> out.println("Numbers " integers));
Output 1 :
OnSubscribe
OnRequest
onNext
onEach
onNext
onEach
Numbers [0, 1]
onEach
OnComplete
onTerminate
doFinally
doAfterTerminate
Example 2 (Flux with 3 elements and buffer size == 2):
Flux.fromStream(IntStream.range(0, 3).boxed()) // Three elements here !!!!!
.doOnSubscribe(subscription -> out.println("OnSubscribe"))
.doOnRequest(l -> out.println("OnRequest"))
.doOnNext(subscription -> out.println("onNext"))
.doOnEach(subscription -> out.println("onEach"))
.doOnComplete(() -> out.println("OnComplete"))
.doOnTerminate(() -> out.println("onTerminate"))
.doAfterTerminate(() -> out.println("doAfterTerminate"))
.doFinally(signalType -> out.println("doFinally"))
.doOnCancel(() -> out.println("onCancel"))
.doOnError(throwable -> out.println("onError"))
.buffer(2) // Buffer size 2
.subscribe(integers -> out.println("Numbers " integers));
Output 2 :
OnSubscribe
OnRequest
onNext
onEach
onNext
onEach
Numbers [0, 1]
onNext
onEach
onEach
OnComplete
onTerminate
Numbers [2]
doFinally
doAfterTerminate
Why in the second example, onComplete was triggered before consumer ending ?
OnComplete
onTerminate
Numbers [2] -> this one ?
I thought that onComplete was triggered when the Flux completes successfully but here, it's not.
I'm little bit confused.
CodePudding user response:
This is because the last buffer wasn't at full capacity yet. Flux.buffer
will emit an item whenever it has a buffer at full capacity. When Flux.fromStream
emits an onComplete signal, it first passes the .doOnComplete
, then it is received at .buffer
after which two things can happen:
- Either Flux.buffer has no items in it's current buffer, and it will relay the onComplete signal downstream.
- Or it has some leftover items, then it will emit the incomplete buffer first (
[2]
in your example), then relay the onComplete signal. Finally, the.subscribe
receives the incomplete buffer.
If you place the .doOnComplete
after the .buffer
, you would find the onComplete signal always to be after the last incomplete buffer, as you are now listening to Flux.buffer
instead of Flux.fromStream
:
Flux.fromStream(IntStream.range(0, 3).boxed()) // Three elements here
.doOnNext(subscription -> out.println("onNext"))
.doOnComplete(() -> out.println("OnComplete fromStream"))
.buffer(2) // Buffer size 2
.doOnComplete(() -> out.println("OnComplete buffer"))
.subscribe(integers -> out.println("Numbers " integers));
Output:
onNext
onNext
Numbers [0, 1]
onNext
OnComplete fromStream
Numbers [2]
OnComplete buffer