Here is my piece of code. I am recently learning Reactive Java.
@SpringBootApplication
public class ReactiveApplication {
Flux<String> flux()
{
return Flux.fromIterable(List.of("James","Harry","Spike")).log();
}
public static void main(String[] args) {
SpringApplication.run(ReactiveApplication.class, args);
ReactiveApplication r=new ReactiveApplication();
r.flux().subscribe(
names->
{
System.out.println("Subscriber called " names);
}
);
}
}
Output:
2022-06-18 23:59:59.319 INFO 35446 --- [ main] reactor.Flux.Iterable.1 : | request(unbounded)
2022-06-18 23:59:59.320 INFO 35446 --- [ main] reactor.Flux.Iterable.1 : | onNext(James)
Subscriber called James
2022-06-18 23:59:59.327 INFO 35446 --- [ main] reactor.Flux.Iterable.1 : | onNext(Harry)
Subscriber called Harry
2022-06-18 23:59:59.327 INFO 35446 --- [ main] reactor.Flux.Iterable.1 : | onNext(Spike)
Subscriber called Spike
2022-06-18 23:59:59.327 INFO 35446 --- [ main] reactor.Flux.Iterable.1 : | onComplete()
Now the code is working as expected. But when I inserted .log() I got confused. As far I know in Reactive Java when a Flux is called all the expected elements here the list of names will stored in JVM till the onComplete is triggered i.e. expected all numbers of elements are stored. But here when first element occured subscriber method called "James" is printed. After that when "Harry" occured "Harry" got printed. Shouldn't it capture all the names then print one by one? I hope I made my query clear. Please help to understand the internal working.
CodePudding user response:
Reactive programming is push based instead of pull based. What it means is that you don't retrieve data from a producer, you can only request and subscribe to get it, and the producer will start to send you data as soon as it's ready.
What you see when using log()
on Flux are events that are getting occurred during data stream processing.
Look at this code, when data is created Produced is printed, when it consumed Received is printed.
public void onStartUp() {
Flux<Integer> flux = feedData();
flux.subscribe(data -> log.info("Received: {}", data));
}
private Flux<Integer> feedData() {
return Flux.range(0, 5).map(i -> {
log.info("Produced: {}", i);
return i;
});
}
So console output is:
Produced: 0
Received: 0
Produced: 1
Received: 1
Produced: 2
Received: 2
Produced: 3
Received: 3
Produced: 4
Received: 4
But it's also possible to buffer all produced data and send it to consumer all together. It requires to make some minor changes
flux.buffer().subscribe(data -> log.info("Received: {}", data));
And console output is:
Produced: 0
Produced: 1
Produced: 2
Produced: 3
Produced: 4
Received: [0, 1, 2, 3, 4]
If you can't really get into reactive concept I highly recommend to watch video by Josh Long about reactive programming: https://www.youtube.com/watch?v=Z5q-CXbvM1E&list=FLDsj0lUfUxzODGteJeT0EeQ&index=15
Upd: There is the interface org.reactivestreams.Subscriber
which describes all 4 possible events(onSubscribe, onNext, one rror, onComplete) that subscriber can receive from publisher. Because reactive is push-based all events occur on publisher side first and then consumed by subscriber.
Which streaming process you mentioned?
Essentially Mono
and Flux
are all reactive streams, you can perform operations(map, flatMap, reduce etc) with these streams the same way as with non-ractive streams
When producer getting ready with a data and at same time if to pushed to subscriber is't that blocking activity?
When data is ready and pushed to consumer then a thread is assigned to consume income data. So technically consumer thread is busy/blocked while doing data processing, but if we add more complexity to code:
private WebClient client = WebClient.builder()
.baseUrl("http://localhost:8080/test")
.build();
public void onStartUp() {
Flux<Integer> flux = feedData();
flux.subscribe(data -> {
log.info("Point A, Received: {}, Thread: {}", data, Thread.currentThread().getName());
processData();
});
}
private void processData() {
Flux<String> response = client
.get()
.retrieve()
.bodyToFlux(String.class);
response.subscribe(el -> log.info("Point B, Web response: {}, Thread: {}", el, Thread.currentThread().getName()));
}
At point A a Thread T1 is assigned for data processing, it logs data and sends request to another application(it can be any time consuming operation). During that time T1 Thread doesn't wait for that request is to complete and is returned to the common thread pool, and at point B when we start getting data from another application for each chunk of data we consume there is a random thread assigned, and there're no guaranties it will be the same Thread T1.
Here's great article about EventLoop: https://dzone.com/articles/spring-webflux-eventloop-vs-thread-per-request-mod