I have been playing around for some time with reactor, but I still need to get something.
This piece of code
Flux.range(1, 1000)
.delayElements(Duration.ofNanos(1))
.map(integer -> integer 1)
.subscribe(System.out::println);
System.out.println("after");
Returns:
after
2
3
4
which is expected as the documentation of subscribe states: this will immediately return control to the calling thread.
Why, then, this piece of code:
Flux.range(1, 1000)
.map(integer -> integer 1)
.subscribe(System.out::println);
returns
1
2
...
1000
1001
after
I can never figure out when subscribe
will block or not, and that's very annoying when writing batches.
If anyone has the answer, that would be amazing
CodePudding user response:
There is no blocking code in your snippet.
In first example you use .delayElements()
and it switches the executing to another thread and releases your main thread. So you can see your System.out.println("after");
executing in Main thread immediately, whilst the reactive chain is being executed on parallel-n
threads.
Your first example:
18:49:29.195 [main] INFO com.example.demo.FluxTest - AFTER
18:49:29.199 [parallel-1] INFO com.example.demo.FluxTest - v: 2
18:49:29.201 [parallel-2] INFO com.example.demo.FluxTest - v: 3
18:49:29.202 [parallel-3] INFO com.example.demo.FluxTest - v: 4
18:49:29.203 [parallel-4] INFO com.example.demo.FluxTest - v: 5
18:49:29.205 [parallel-5] INFO com.example.demo.FluxTest - v: 6
But your second example does not switch the executing thread, so your reactive chain executes on Main thread. And after it completes it continues to execute your System.out.println("after");
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 995
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 996
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 997
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 998
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 999
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 1000
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 1001
18:51:28.491 [main] INFO com.example.demo.FluxTest - AFTER
EDIT: If you want to switch the thread in your second snippet, basically you have two options:
Add
subscribeOn(<Scheduler>)
in any place of your reactive chain. Then the whole subscription process will happen on a thread from scheduler you provided.Add
publishOn(<Scheduler>)
, for example, afterFlux.range()
, then the emitting itself will happen on your calling thread, but the downstream will be executed on a thread from the scheduler you provided