Home > Blockchain >  Event sequencing in project-reactor
Event sequencing in project-reactor

Time:09-08

I'm pretty new in project-reactor and I need some clarification about events sequencing.

Example 1 (Flux with 2 elements and buffer size == 2) :

Flux.fromStream(IntStream.range(0, 2).boxed()) // Two element in the stream
   .doOnSubscribe(subscription -> out.println("OnSubscribe"))
   .doOnRequest(l -> out.println("OnRequest"))
   .doOnNext(subscription -> out.println("onNext"))
   .doOnEach(subscription -> out.println("onEach"))
   .doOnComplete(() -> out.println("OnComplete"))
   .doOnTerminate(() -> out.println("onTerminate"))
   .doAfterTerminate(() -> out.println("doAfterTerminate"))
   .doFinally(signalType -> out.println("doFinally"))
   .doOnCancel(() -> out.println("onCancel"))
   .doOnError(throwable -> out.println("onError"))
   .buffer(2) // Buffer size 2
   .subscribe(integers -> out.println("Numbers "   integers));

Output 1 :

OnSubscribe
OnRequest
onNext
onEach
onNext
onEach
Numbers [0, 1]
onEach
OnComplete
onTerminate
doFinally
doAfterTerminate

Example 2 (Flux with 3 elements and buffer size == 2):

Flux.fromStream(IntStream.range(0, 3).boxed()) // Three elements here !!!!!
   .doOnSubscribe(subscription -> out.println("OnSubscribe"))
   .doOnRequest(l -> out.println("OnRequest"))
   .doOnNext(subscription -> out.println("onNext"))
   .doOnEach(subscription -> out.println("onEach"))
   .doOnComplete(() -> out.println("OnComplete"))
   .doOnTerminate(() -> out.println("onTerminate"))
   .doAfterTerminate(() -> out.println("doAfterTerminate"))
   .doFinally(signalType -> out.println("doFinally"))
   .doOnCancel(() -> out.println("onCancel"))
   .doOnError(throwable -> out.println("onError"))
   .buffer(2) // Buffer size 2
   .subscribe(integers -> out.println("Numbers "   integers));

Output 2 :

OnSubscribe
OnRequest
onNext
onEach
onNext
onEach
Numbers [0, 1]
onNext
onEach
onEach
OnComplete
onTerminate
Numbers [2]
doFinally
doAfterTerminate

Why in the second example, onComplete was triggered before consumer ending ?

OnComplete
onTerminate
Numbers [2] -> this one ? 

I thought that onComplete was triggered when the Flux completes successfully but here, it's not.

I'm little bit confused.

CodePudding user response:

This is because the last buffer wasn't at full capacity yet. Flux.buffer will emit an item whenever it has a buffer at full capacity. When Flux.fromStream emits an onComplete signal, it first passes the .doOnComplete, then it is received at .buffer after which two things can happen:

  1. Either Flux.buffer has no items in it's current buffer, and it will relay the onComplete signal downstream.
  2. Or it has some leftover items, then it will emit the incomplete buffer first ([2] in your example), then relay the onComplete signal. Finally, the .subscribe receives the incomplete buffer.

If you place the .doOnComplete after the .buffer, you would find the onComplete signal always to be after the last incomplete buffer, as you are now listening to Flux.buffer instead of Flux.fromStream:

Flux.fromStream(IntStream.range(0, 3).boxed()) // Three elements here
        .doOnNext(subscription -> out.println("onNext"))
        .doOnComplete(() -> out.println("OnComplete fromStream"))
        .buffer(2) // Buffer size 2
        .doOnComplete(() -> out.println("OnComplete buffer"))
        .subscribe(integers -> out.println("Numbers "   integers));

Output:

onNext
onNext
Numbers [0, 1]
onNext
OnComplete fromStream
Numbers [2]
OnComplete buffer
  • Related