Home > Back-end >  Send multipart/form via WebClient [Spring boot]
Send multipart/form via WebClient [Spring boot]

Time:01-18

I'm trying to send an multipart/form in a WebClient request that contains files and String fields, but blockhound notify that FileInputStream is blocking the call.

public Mono<ResponseEntity> anotherCall(){
    MultipartBodyBuilder builder = new MultipartBodyBuilder();
    builder.part("description", "description");
    builder.part("email", "email");
    builder.part("priority", 1);
    builder.part("status", 2);
    builder.part("subject", "subject");
    builder.part("company_id", 150000240766l);
  return Mono.defer(() -> {
             return this.webClient.getWebClient().post().uri(URI)
                     .headers(
                      httpHeaders -> {
                          httpHeaders.setBasicAuth("API_KEY", "X");
                      })
                     .bodyValue(builder.build())
                     .contentType(MediaType.MULTIPART_FORM_DATA)
                     .exchangeToMono(
                             response -> {
                                 final HttpStatus status = response.statusCode();
                                 return response
                                         .bodyToMono(Map.class)
                                         .defaultIfEmpty(Map.of())
                                         .map(
                                                 body -> {
                                                     if (status.is2xxSuccessful()) {
                                                         return ResponseEntity.ok().body(body);
                                                     } else {
                                                         return Error(body);
                                                     }
                                                 });
                             });

         });

I've tried to use builder.asyncPart(), tried to call Flux.then(), tried .subscribeOn(Schedulers.boundedElastic())

Here is the stacktrace from blockhound, I read that webClient block threads to generate boundary for parts of form, I've tried to generate boundary and add to header, but no success.

    reactor.blockhound.BlockingOperationError: Blocking call! java.io.FileInputStream#readBytes
        at java.base/java.io.FileInputStream.readBytes(FileInputStream.java)
        at java.base/java.io.FileInputStream.read(FileInputStream.java:276)
        at java.base/java.io.FilterInputStream.read(FilterInputStream.java:132)
        at java.base/sun.security.provider.NativePRNG$RandomIO.readFully(NativePRNG.java:425)
        at java.base/sun.security.provider.NativePRNG$RandomIO.ensureBufferValid(NativePRNG.java:528)
        at java.base/sun.security.provider.NativePRNG$RandomIO.implNextBytes(NativePRNG.java:547)
        at java.base/sun.security.provider.NativePRNG.engineNextBytes(NativePRNG.java:221)
        at java.base/java.security.SecureRandom.nextBytes(SecureRandom.java:758)
        at java.base/java.security.SecureRandom.next(SecureRandom.java:815)
        at java.base/java.util.Random.nextInt(Random.java:323)
        at org.springframework.util.MimeTypeUtils.generateMultipartBoundary(MimeTypeUtils.java:404)
        at org.springframework.http.codec.multipart.MultipartWriterSupport.generateMultipartBoundary(MultipartWriterSupport.java:105)
        at org.springframework.http.codec.multipart.MultipartHttpMessageWriter.writeMultipart(MultipartHttpMessageWriter.java:185)
        at org.springframework.http.codec.multipart.MultipartHttpMessageWriter.lambda$write$0(MultipartHttpMessageWriter.java:158)
        at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:152)
        at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
        at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:441)
        at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:677)
        at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.run(DefaultPooledConnectionProvider.java:274)
        at io.netty.util.concurrent.AbstractEventExecutor.runTask$$$capture(AbstractEventExecutor.java:174)
        at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:167)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:833)
    
    2023-01-17 09:02:07.040  WARN [,19587802144e28cfd5bee3752b442824,a602a51f0e98a86c]   [nio-8080-exec-6] c.i.p.e.api.GlobalExceptionHandler       : Blocking call! java.io.FileInputStream#readBytes; nested exception is reactor.blockhound.BlockingOperationError: Blocking call! java.io.FileInputStream#readBytes
    
    org.springframework.web.reactive.function.client.WebClientRequestException: Blocking call! java.io.FileInputStream#readBytes; nested exception is reactor.blockhound.BlockingOperationError: Blocking call! java.io.FileInputStream#readBytes
        at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
        Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
    Error has been observed at the following site(s):
        *__checkpoint ⇢ Request to POST URI[DefaultWebClient]
    Original Stack Trace:
            at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
            at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:55)
            at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
            at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
            at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
            at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
            at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
            at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)
            at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:96)
            at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:204)
            at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
            at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225)
            at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274)
            at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:96)
            at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
            at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:415)
            at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251)
            at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:89)
            at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
            at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537)
            at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343)
            at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
            at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
            at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190)
            at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:96)
            at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:201)
            at reactor.netty.http.client.HttpClientConnect$HttpObserver.onUncaughtException(HttpClientConnect.java:399)
            at reactor.netty.ReactorNetty$CompositeConnectionObserver.onUncaughtException(ReactorNetty.java:670)
            at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onUncaughtException(DefaultPooledConnectionProvider.java:205)
            at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onUncaughtException(DefaultPooledConnectionProvider.java:454)
            at reactor.netty.http.client.HttpClientOperations.onOutboundError(HttpClientOperations.java:583)
            at reactor.netty.channel.ChannelOperations.onError(ChannelOperations.java:254)
            at reactor.core.publisher.Operators.error(Operators.java:198)
            at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:161)
            at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
            at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
            at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
            at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:441)
            at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:677)
            at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.run(DefaultPooledConnectionProvider.java:274)
            at io.netty.util.concurrent.AbstractEventExecutor.runTask$$$capture(AbstractEventExecutor.java:174)
            at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java)
            at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:167)
            at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
            at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
            at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
            at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.base/java.lang.Thread.run(Thread.java:833)
    Caused by: reactor.blockhound.BlockingOperationError: Blocking call! java.io.FileInputStream#readBytes
        at java.base/java.io.FileInputStream.readBytes(FileInputStream.java)
        at java.base/java.io.FileInputStream.read(FileInputStream.java:276)
        at java.base/java.io.FilterInputStream.read(FilterInputStream.java:132)
        at java.base/sun.security.provider.NativePRNG$RandomIO.readFully(NativePRNG.java:425)
        at java.base/sun.security.provider.NativePRNG$RandomIO.ensureBufferValid(NativePRNG.java:528)
        at java.base/sun.security.provider.NativePRNG$RandomIO.implNextBytes(NativePRNG.java:547)
        at java.base/sun.security.provider.NativePRNG.engineNextBytes(NativePRNG.java:221)
        at java.base/java.security.SecureRandom.nextBytes(SecureRandom.java:758)
        at java.base/java.security.SecureRandom.next(SecureRandom.java:815)
        at java.base/java.util.Random.nextInt(Random.java:323)
        at org.springframework.util.MimeTypeUtils.generateMultipartBoundary(MimeTypeUtils.java:404)
        at org.springframework.http.codec.multipart.MultipartWriterSupport.generateMultipartBoundary(MultipartWriterSupport.java:105)
        at org.springframework.http.codec.multipart.MultipartHttpMessageWriter.writeMultipart(MultipartHttpMessageWriter.java:185)
        at org.springframework.http.codec.multipart.MultipartHttpMessageWriter.lambda$write$0(MultipartHttpMessageWriter.java:158)
        at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:152)
        at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
        at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:441)
        at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:677)
        at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.run(DefaultPooledConnectionProvider.java:274)
        at io.netty.util.concurrent.AbstractEventExecutor.runTask$$$capture(AbstractEventExecutor.java:174)
        at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:167)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:833)

CodePudding user response:

         .body(
         BodyInserters.fromProducer(
             Mono.just(builder.build()).subscribeOn(Schedulers.boundedElastic()),
             MultiValueMap.class));
  • Related