Problem
I have defined a CustomHttpMessageReader
(which implements HttpMessageReader<CustomClass>
), which is able to read a multipart response from a server and converts the received parts into an object of a specific class. The CustomHttpMessageReader
uses internally the DefaultPartHttpMessageReader
to actually read/parse the multipart responses.
The CustomHttpMessageReader
accumulates the parts read by the DefaultReader and converts them into the desired class CustomClass
.
I've created a CustomHttpMessageConverter
that does the same thing for a RestTemplate
, but I struggle to do the same for a WebClient
.
I always get the following Exception:
block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83)
at reactor.core.publisher.Flux.blockFirst(Flux.java:2600)
at com.company.project.deserializer.multipart.CustomHttpMessageReader.readMultipartData(CustomHttpMessageReader.java:116)
at com.company.project.deserializer.multipart.CustomHttpMessageReader.readMono(CustomHttpMessageReader.java:101)
at org.springframework.web.reactive.function.BodyExtractors.lambda$readToMono$14(BodyExtractors.java:211)
at java.base/java.util.Optional.orElseGet(Optional.java:369)
...
Mind you, I'm not interested in running WebClient
asynchronously. I'm only future proofing my application because RestTemplate
is apparently only in maintenance mode and the folks at Pivotal/Spring suggest using WebClient
instead.
What I Tried
As I understand, there are threads that are not allowed to be blocked, namely the netty-nio one in the exception. I tried removing netty from my dependencies, so that I can rely solely on Tomcat. That however doesn't seem to help, as I get another exception, explaining me, that no suitable HttpConnector
exists (exception thrown by the WebClient.Builder)
No suitable default ClientHttpConnector found
java.lang.IllegalStateException: No suitable default ClientHttpConnector found
at org.springframework.web.reactive.function.client.DefaultWebClientBuilder.initConnector(DefaultWebClientBuilder.java:297)
at org.springframework.web.reactive.function.client.DefaultWebClientBuilder.build(DefaultWebClientBuilder.java:266)
at com.company.project.RestClientUsingWebClient.getWebclient(RestClientUsingWebClient.java:160)
I've tried my code executed in a unit test as well, as starting a whole Spring context. The result is unfortunately the same.
Setup
To provide a bit more details, the following are snippets from the Classes mentioned earlier. The classes are not shown fully in order to understand better what is going on. All necessary methods are implemented (like e.g. canRead()
in the Reader).
CustomHttpMessageReader
I also included in the class the usage of CustomPart
(in addition to CustomClass
) just to show, that the content of the Part
is also read i.e. blocked.
public class CustomHttpMessageReader implements HttpMessageReader<CustomClass> {
private final DefaultPartHttpMessageReader defaultPartHttpMessageReader = new DefaultPartHttpMessageReader();
@Override
public Flux<CustomClass> read(final ResolvableType elementType, final ReactiveHttpInputMessage message,
final Map<String, Object> hints) {
return Flux.merge(readMono(elementType, message, hints));
}
@Override
public Mono<CustomClass> readMono(final ResolvableType elementType, final ReactiveHttpInputMessage message,
final Map<String, Object> hints) {
final List<CustomPart> customParts = readMultipartData(message);
return convertToCustomClass(customParts);
}
private List<CustomPart> readMultipartData(final ReactiveHttpInputMessage message) {
final ResolvableType resolvableType = ResolvableType.forClass(byte[].class);
return Optional.ofNullable(
defaultPartHttpMessageReader.read(resolvableType, message, Map.of())
.buffer()
.blockFirst()) // <- EXCEPTION IS THROWN HERE!
.orElse(new ArrayList<>())
.stream()
.map(part -> {
final byte[] content = Optional.ofNullable(part.content().blockFirst()) //<- HERE IS ANOTHER BLOCK
.map(DataBuffer::asByteBuffer)
.map(ByteBuffer::array)
.orElse(new byte[]{});
// Here we cherry pick some header fields
return new CustomPart(content, someHeaderFields);
}).collect(Collectors.toList());
}
}
Usage of WebClient
class RestClientUsingWebClient {
/**
* The "Main" Method for our purposes
*/
public Optional<CustomClass> getResource(final String baseUrl, final String id) {
final WebClient webclient = getWebclient(baseUrl);
//curl -X GET "http://BASE_URL/id" -H "accept: multipart/form-data"
return webclient.get()
.uri(uriBuilder -> uriBuilder.path(id).build()).retrieve()
.toEntity(CustomClass.class)
.onErrorResume(NotFound.class, e -> Mono.empty())
.blockOptional() // <- HERE IS ANOTHER BLOCK
.map(ResponseEntity::getBody);
}
//This exists also as a Bean definition
private WebClient getWebclient(final String baseUrl) {
final ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
.codecs(codecs -> {
codecs.defaultCodecs().maxInMemorySize(16 * 1024 * 1024);
codecs.customCodecs().register(new CustomHttpMessageReader()); // <- Our custom reader
})
.build();
return WebClient.builder()
.baseUrl(baseUrl)
.exchangeStrategies(exchangeStrategies)
.build();
}
}
Usage of build.gradle
For the sake of completion, here is what I think is the relevant part of my build.gradle
plugins {
id 'org.springframework.boot' version '2.7.2'
id 'io.spring.dependency-management' version '1.0.13.RELEASE'
...
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.boot:spring-boot-starter-web' // <- This
implementation 'org.springframework.boot:spring-boot-starter-webflux'
// What I tried:
// implementation ('org.springframework.boot:spring-boot-starter-webflux'){
// exclude group: 'org.springframework.boot', module: 'spring-boot-starter-reactor-netty'
//}
...
}
CodePudding user response:
if we look in the stacktrace that you provided we see these 3 lines
at reactor.core.publisher.Flux.blockFirst(Flux.java:2600)
at com.company.project.deserializer.multipart.CustomHttpMessageReader.readMultipartData(CustomHttpMessageReader.java:116)
at com.company.project.deserializer.multipart.CustomHttpMessageReader.readMono(CustomHttpMessageReader.java:101)
They should be read from bottom to top. So what do they tell us?
The bottom line tells us that the function readMono
on the line 101
in the class CustomHttpMessageReader.java
was called first.
That function then called the function readMultipartData
on line 116
in the class CustomHttpMessageReader
(same class as above)
Then the function blockFirst
was called on line 2600
in the class Flux
.
Thats your blocking call.
So we can tell that there is a blocking call in the function readMultipartData
.
So why cant we block in the function? well if we look in the API for the interface that function is overriding HttpMessageReader we can se that the function returns a Mono<T>
which means that the function is an async
function.
And if it is async
and we block
we might get very very bad performance.
This interface is used within the Spring WebClient
which is a fully async
client.
You can use it in a non-async application, but that means you can block outside of the WebClient
but internally, it needs to operate completely async if you want it to be as efficient as possible.
So the bottom line is that you should not block in any function that returns a Mono
or a Flux
.