I'm trying to send an multipart/form in a WebClient request that contains files and String fields, but blockhound notify that FileInputStream is blocking the call.
public Mono<ResponseEntity> anotherCall(){
MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.part("description", "description");
builder.part("email", "email");
builder.part("priority", 1);
builder.part("status", 2);
builder.part("subject", "subject");
builder.part("company_id", 150000240766l);
return Mono.defer(() -> {
return this.webClient.getWebClient().post().uri(URI)
.headers(
httpHeaders -> {
httpHeaders.setBasicAuth("API_KEY", "X");
})
.bodyValue(builder.build())
.contentType(MediaType.MULTIPART_FORM_DATA)
.exchangeToMono(
response -> {
final HttpStatus status = response.statusCode();
return response
.bodyToMono(Map.class)
.defaultIfEmpty(Map.of())
.map(
body -> {
if (status.is2xxSuccessful()) {
return ResponseEntity.ok().body(body);
} else {
return Error(body);
}
});
});
});
I've tried to use builder.asyncPart(), tried to call Flux.then(), tried .subscribeOn(Schedulers.boundedElastic())
Here is the stacktrace from blockhound, I read that webClient block threads to generate boundary for parts of form, I've tried to generate boundary and add to header, but no success.
reactor.blockhound.BlockingOperationError: Blocking call! java.io.FileInputStream#readBytes
at java.base/java.io.FileInputStream.readBytes(FileInputStream.java)
at java.base/java.io.FileInputStream.read(FileInputStream.java:276)
at java.base/java.io.FilterInputStream.read(FilterInputStream.java:132)
at java.base/sun.security.provider.NativePRNG$RandomIO.readFully(NativePRNG.java:425)
at java.base/sun.security.provider.NativePRNG$RandomIO.ensureBufferValid(NativePRNG.java:528)
at java.base/sun.security.provider.NativePRNG$RandomIO.implNextBytes(NativePRNG.java:547)
at java.base/sun.security.provider.NativePRNG.engineNextBytes(NativePRNG.java:221)
at java.base/java.security.SecureRandom.nextBytes(SecureRandom.java:758)
at java.base/java.security.SecureRandom.next(SecureRandom.java:815)
at java.base/java.util.Random.nextInt(Random.java:323)
at org.springframework.util.MimeTypeUtils.generateMultipartBoundary(MimeTypeUtils.java:404)
at org.springframework.http.codec.multipart.MultipartWriterSupport.generateMultipartBoundary(MultipartWriterSupport.java:105)
at org.springframework.http.codec.multipart.MultipartHttpMessageWriter.writeMultipart(MultipartHttpMessageWriter.java:185)
at org.springframework.http.codec.multipart.MultipartHttpMessageWriter.lambda$write$0(MultipartHttpMessageWriter.java:158)
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:152)
at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:441)
at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:677)
at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.run(DefaultPooledConnectionProvider.java:274)
at io.netty.util.concurrent.AbstractEventExecutor.runTask$$$capture(AbstractEventExecutor.java:174)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:167)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
2023-01-17 09:02:07.040 WARN [,19587802144e28cfd5bee3752b442824,a602a51f0e98a86c] [nio-8080-exec-6] c.i.p.e.api.GlobalExceptionHandler : Blocking call! java.io.FileInputStream#readBytes; nested exception is reactor.blockhound.BlockingOperationError: Blocking call! java.io.FileInputStream#readBytes
org.springframework.web.reactive.function.client.WebClientRequestException: Blocking call! java.io.FileInputStream#readBytes; nested exception is reactor.blockhound.BlockingOperationError: Blocking call! java.io.FileInputStream#readBytes
at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ Request to POST URI[DefaultWebClient]
Original Stack Trace:
at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:55)
at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:96)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:204)
at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225)
at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:96)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:415)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:89)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537)
at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:96)
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:201)
at reactor.netty.http.client.HttpClientConnect$HttpObserver.onUncaughtException(HttpClientConnect.java:399)
at reactor.netty.ReactorNetty$CompositeConnectionObserver.onUncaughtException(ReactorNetty.java:670)
at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onUncaughtException(DefaultPooledConnectionProvider.java:205)
at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onUncaughtException(DefaultPooledConnectionProvider.java:454)
at reactor.netty.http.client.HttpClientOperations.onOutboundError(HttpClientOperations.java:583)
at reactor.netty.channel.ChannelOperations.onError(ChannelOperations.java:254)
at reactor.core.publisher.Operators.error(Operators.java:198)
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:161)
at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:441)
at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:677)
at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.run(DefaultPooledConnectionProvider.java:274)
at io.netty.util.concurrent.AbstractEventExecutor.runTask$$$capture(AbstractEventExecutor.java:174)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:167)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: reactor.blockhound.BlockingOperationError: Blocking call! java.io.FileInputStream#readBytes
at java.base/java.io.FileInputStream.readBytes(FileInputStream.java)
at java.base/java.io.FileInputStream.read(FileInputStream.java:276)
at java.base/java.io.FilterInputStream.read(FilterInputStream.java:132)
at java.base/sun.security.provider.NativePRNG$RandomIO.readFully(NativePRNG.java:425)
at java.base/sun.security.provider.NativePRNG$RandomIO.ensureBufferValid(NativePRNG.java:528)
at java.base/sun.security.provider.NativePRNG$RandomIO.implNextBytes(NativePRNG.java:547)
at java.base/sun.security.provider.NativePRNG.engineNextBytes(NativePRNG.java:221)
at java.base/java.security.SecureRandom.nextBytes(SecureRandom.java:758)
at java.base/java.security.SecureRandom.next(SecureRandom.java:815)
at java.base/java.util.Random.nextInt(Random.java:323)
at org.springframework.util.MimeTypeUtils.generateMultipartBoundary(MimeTypeUtils.java:404)
at org.springframework.http.codec.multipart.MultipartWriterSupport.generateMultipartBoundary(MultipartWriterSupport.java:105)
at org.springframework.http.codec.multipart.MultipartHttpMessageWriter.writeMultipart(MultipartHttpMessageWriter.java:185)
at org.springframework.http.codec.multipart.MultipartHttpMessageWriter.lambda$write$0(MultipartHttpMessageWriter.java:158)
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:152)
at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:441)
at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:677)
at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.run(DefaultPooledConnectionProvider.java:274)
at io.netty.util.concurrent.AbstractEventExecutor.runTask$$$capture(AbstractEventExecutor.java:174)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:167)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
CodePudding user response:
.body(
BodyInserters.fromProducer(
Mono.just(builder.build()).subscribeOn(Schedulers.boundedElastic()),
MultiValueMap.class));