Small question on how to add delay in a method but in a non blocking way please.
A very popular way to simulate long processes is to use Thread.sleep();
However, for project Reactor, this is a blocking operation.
And it is well known, in a reactive project, we should not block.
I would like to experiment and simulate long processes. Some sort of method which will take a lot of time, but in a NON blocking way, WITHOUT swapping thread. This is to simulate a method that is just vey lengthy, but proven NON blocking by BlockHound etc.
This construct is very popular:
@Test
public void simulateLengthyProcessingOperationReactor() {
Flux.range(1,5000)
.map(a -> simulateLengthyProcessingOperation(a))
.subscribe(System.out::println);
}
public String simulateLengthyProcessingOperation(Integer input) {
simulateDelayBLOCKING();
return String.format("[%d] on thread [%s] at time [%s]", input, Thread.currentThread().getName(), new Date());
}
public void simulateDelayBLOCKING() {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
But it is blocking.
(I know there is the Mono.fromCallable(() ->
but this is not the question)
Is it possible to do the same, simulate delay, but NON blocking please?
Also, .delay
will not achieve the expected result (simulating a NON blocking lengthy method on the same reactive pipeline)
@Test
public void simulateLengthyProcessingOperationReactor() {
Flux.range(1,5000)
.map(a -> simulateLengthyProcessingOperation(a))
.subscribe(System.out::println);
}
public String simulateLengthyProcessingOperation(Integer input) {
simulateDelay_NON_blocking();
return String.format("[%d] on thread [%s] at time [%s]", input, Thread.currentThread().getName(), new Date());
}
public void simulateDelay_NON_blocking() {
//simulate lengthy process, but WITHOUT blocking
}
Thank you
CodePudding user response:
Of course you can, there is a family of methods .delay...()
You can for example read about delayElements()
method here:
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#delayElements-java.time.Duration-
You should know that it switches the executing thread to another Scheduler
.
Signals are delayed and continue on the parallel default Scheduler.
In simplest case it would look like this:
public void simulateLengthyProcessingOperationReactor() {
Flux.range(1,5000)
.delayElements(Duration.ofMillis(1000L)) // delay each element for 1000 millis
.subscribe(System.out::println);
}
According to your case you could write your code like this:
@Test
public void simulateLengthyProcessingOperationReactor() {
Flux.range(1,5000)
.concatMap(this::simulateDelay_NON_blocking)
.subscribe(System.out::println);
}
public Mono<String> simulateDelay_NON_blocking(Integer input) {
//simulate lengthy process, but WITHOUT blocking
return Mono.delay(Duration.ofMillis(1000L))
.map(__ -> String.format("[%d] on thread [%s] at time [%s]",
input, Thread.currentThread().getName(), new Date()));
}