Home > Software design >  CXF - Retrieve SOAP response XML from generated client
CXF - Retrieve SOAP response XML from generated client

Time:09-17

I am currently generating async client code using wsdl-to-java which is being used to query a SOAP web service. Here is a snippet of the generated async method:

@WebMethod(operationName = "GetSession")
public Future<?> getSessionAsync(
  @WebParam(partName = "parameters", name = "GetSessionRequest")
  mynamespace.GetSessionRequest parameters,
  @WebParam(partName = "ResponseHeader", mode = WebParam.Mode.OUT, name = "ResponseHeader", header = true)
  javax.xml.ws.Holder<mydatacontract.ResponseHeader> responseHeader,
  @WebParam(name = "asyncHandler", targetNamespace = "")
  AsyncHandler<myservice.GetSessionResponse> asyncHandler
);

I am calling the above generated code in a wrapper class:

getSession(GetSessionRequest request) {
  Future<?> response = generatedClient.getSessionAsync(request, responseHeader, handler)
}

handler(Response<GetSessionResponse> response) {
  // no access to SOAP XML at this point?
}

As per my understanding, the generated code takes care of serialization/de-serialization and I do not have access to the raw SOAP response. There are ways to log the SOAP XML response as described here but I need to access this in code as the response needs to be dumped into the database.

Is there any way to access this in the handler without touching the generated client code?

UPDATE:

I am able to read the ResponseContext which is of type java.util.Map<String, Object>. But this does not return the raw SOAP XML that I am looking for.

Also, using an inbound Interceptor would mean that I lose the context of the calling function. This is required to store the XML response associated with each call in the database.

UPDATE 2:

The Future returns an object of type Response which can be found at Response.java. The jdoc states the following:

The interface provides methods used to obtain the
payload and context of a message sent in response to an operation
invocation.

However, I am only able to retrieve the Context and no property to access the payload.

I found an SO answer that has a solution for Axis here. Is it possible to have something similar in cxf?

CodePudding user response:

Take a look at this code snippet (source : https://github.com/apache/cxf/blob/master/core/src/main/java/org/apache/cxf/interceptor/LoggingInInterceptor.java)

import org.apache.cxf.message.Message;

...

protected void logging(Logger logger, Message message) {
    if (message.containsKey(LoggingMessage.ID_KEY)) {
        return;
    }
    String id = (String)message.getExchange().get(LoggingMessage.ID_KEY);
    if (id == null) {
        id = LoggingMessage.nextId();
        message.getExchange().put(LoggingMessage.ID_KEY, id);
    }
    message.put(LoggingMessage.ID_KEY, id);
    final LoggingMessage buffer
        = new LoggingMessage("Inbound Message\n----------------------------", id);

    if (!Boolean.TRUE.equals(message.get(Message.DECOUPLED_CHANNEL_MESSAGE))) {
        // avoid logging the default responseCode 200 for the decoupled responses
        Integer responseCode = (Integer)message.get(Message.RESPONSE_CODE);
        if (responseCode != null) {
            buffer.getResponseCode().append(responseCode);
        }
    }

    String encoding = (String)message.get(Message.ENCODING);

    if (encoding != null) {
        buffer.getEncoding().append(encoding);
    }
    String httpMethod = (String)message.get(Message.HTTP_REQUEST_METHOD);
    if (httpMethod != null) {
        buffer.getHttpMethod().append(httpMethod);
    }
    String ct = (String)message.get(Message.CONTENT_TYPE);
    if (ct != null) {
        buffer.getContentType().append(ct);
    }
    Object headers = message.get(Message.PROTOCOL_HEADERS);

    if (headers != null) {
        buffer.getHeader().append(headers);
    }
    String uri = (String)message.get(Message.REQUEST_URL);
    if (uri == null) {
        String address = (String)message.get(Message.ENDPOINT_ADDRESS);
        uri = (String)message.get(Message.REQUEST_URI);
        if (uri != null && uri.startsWith("/")) {
            if (address != null && !address.startsWith(uri)) {
                if (address.endsWith("/") && address.length() > 1) {
                    address = address.substring(0, address.length() - 1);
                }
                uri = address   uri;
            }
        } else {
            uri = address;
        }
    }
    if (uri != null) {
        buffer.getAddress().append(uri);
        String query = (String)message.get(Message.QUERY_STRING);
        if (query != null) {
            buffer.getAddress().append('?').append(query);
        }
    }

    if (!isShowBinaryContent() && isBinaryContent(ct)) {
        buffer.getMessage().append(BINARY_CONTENT_MESSAGE).append('\n');
        log(logger, buffer.toString());
        return;
    }
    if (!isShowMultipartContent() && isMultipartContent(ct)) {
        buffer.getMessage().append(MULTIPART_CONTENT_MESSAGE).append('\n');
        log(logger, buffer.toString());
        return;
    }

//// IMPORTANT LINE

    InputStream is = message.getContent(InputStream.class);

////

    if (is != null) {
        logInputStream(message, is, buffer, encoding, ct);
    } else {
        Reader reader = message.getContent(Reader.class);
        if (reader != null) {
            logReader(message, reader, buffer);
        }
    }
    log(logger, formatLoggingMessage(buffer));
}

So it follows that you can create an interceptor and get the content of the message using org.apache.cxf.message.Message

CodePudding user response:

The problem: SOAP messages can be intercepted, but at a very separate location to the original call site. It's difficult to pass the SOAP message back to the original call site, particularly in multithreaded or asynchronous environments.

The only solution I can see is to explicitly only have one JAX-WS Proxy, which has one Handler, per request. Having only one Proxy for an application would be a bottleneck, so it requires using multithreading tools to allow for parallel and async execution.

Here's my idea, in code. First I go through it step-by-step, and at the end there's a dump of all code.

UPDATE: I've replaced the LinkedBlockingQueue with a static ThreadLocal<SoapApiWrapper> instance, and the executor with a newWorkStealingPool(). See the edit history for the changes!

I've set it up to use http://www.dneonline.com/calculator.asmx. It compiles and runs, but I haven't spent a lot of time ensuring it works correctly or is optimal. I'm sure there are issues (my CPU fan is working hard, even though I'm not running code). Be warned!

(Does anyone know of a better public SOAP API that I can either run locally or flood with requests?) If you'd like to test, here are some public SOAP APIs: https://documenter.getpostman.com/view/8854915/Szf26WHn

Step by step

  1. Implement SOAPHandler class that will capture messages, called SoapMessageHandler.

    public class SoapMessageHandler implements SOAPHandler<SOAPMessageContext> {
    
      // capture messages in a list
      private final List<SOAPMessageContext> messages = new ArrayList<>();
    
      // get & clear messages
      public List<SOAPMessageContext> collectMessages() {
        var m = new ArrayList<>(messages);
        messages.clear();
        return m;
      }
    
      @Override
      public boolean handleMessage(SOAPMessageContext context) {
        messages.add(context); // collect message
        return true;
      }
    
      @Override
      public boolean handleFault(SOAPMessageContext context) {
        messages.add(context); // collect error
        return true;
      }
    }
    
  2. Define a SoapApiWrapper class that

    1. creates one SoapMessageHandler,
    2. creates one JAX-WS Proxy,
    3. and adds the Handler to the Proxy.
    class SoapApiWrapper {
        // 1. create a handler    
        private final SoapMessageHandler soapMessageHandler = new SoapMessageHandler();
        private final CalculatorSoap connection;
    
        public SoapApiWrapper() {
          // 2. create one connection
          var factoryBean = new JaxWsProxyFactoryBean();
          factoryBean.setAddress("http://www.dneonline.com/calculator.asmx");
          factoryBean.setServiceClass(CalculatorSoap.class);
          // 3. add the Handler
          factoryBean.setHandlers(Collections.singletonList(soapMessageHandler));
    
          connection = factoryBean.create(CalculatorSoap.class);
        }
    }
    
  3. Define a SoapApiManager that has

    1. an ExecutorService, which will manage the SOAP requests and responses
    2. a ThreadLocal<SoapApiWrapper>, so each thread has an JAX-WS Proxy (idea from https://stackoverflow.com/a/16680215/4161471)
    public class SoapApiManager   {
      // 1. request executor
      private static final ExecutorService executorService = Executors.newWorkStealingPool(THREAD_LIMIT);
      private static final ThreadLocal<SoapApiWrapper> soapApiWrapper = ThreadLocal.withInitial(SoapApiWrapper::new);
    }
    
  4. The SoapApiManager has a method, submitRequest(...). It will return the SOAP API response ** and** the SOAP messages.

    public <ResponseT> CompletableFuture<SoapResponseHolder<ResponseT>> submitRequest(
        SoapRequestRunner<ResponseT> requestRunner
    ) {
     //...
    }
    

    The parameter is a SoapRequestRunner, a lambda that accepts a JAX-WS Proxy and returns a SOAP Response.

    @FunctionalInterface
    interface SoapRequestRunner<ResponseT> {
      ResponseT sendRequest(CalculatorSoap calculatorSoap);
    }
    

    When invoked, submitRequest(...) performs the following:

    1. Wrap the SoapRequestRunner with CompleteableFuture.supplyAsync(...), and using our ExectutorService
    2. Fetches a SoapApiWrapper from the ThreadLocal,
    3. calls the SOAP API (by applying the SoapRequestRunner to the SoapApiWrapper's JAX-WS Proxy)
    4. awaits the SOAP result,
    5. extracts the SOAP messages from the SoapApiWrapper's SOAPHandler,
    6. and finally, bundles the SOAP result and SOAP messages in a DTO, SoapResponseHolder
      public <ResponseT> CompletableFuture<SoapResponseHolder<ResponseT>> submitRequest(
          SoapRequestRunner<ResponseT> requestRunner
      ) { // 1. use CompletableFuture & executorService
        return CompletableFuture.supplyAsync(createRequestCall(requestRunner), executorService);
      }
    
      private <ResponseT> Supplier<SoapResponseHolder<ResponseT>> createRequestCall(
          SoapRequestRunner<ResponseT> requestRunner
      ) {
        return () -> {
          SoapApiWrapper api = null;
          try {
            api = soapApiWrapperQueue.get(); // 2. fetch an API Wrapper
    
            var response = requestRunner.sendRequest(api.connection); // 3&4. request & response
            var messages = api.soapMessageHandler.collectMessages(); // 5. extract raw SOAP messages
    
            return new SoapResponseHolder<>(response, messages); // 6. bundle into DTO
          } catch (InterruptedException e) {
            throw new RuntimeException(e);
          } finally {
            if (api != null) {
              soapApiWrapperQueue.offer(api);
            }
          }
        };
      } 
    

Example Usage

public class Main {
  
  public static void main(String[] args) {

    SoapApiManager apiManager = new SoapApiManager();

    apiManager
        .submitRequest((soapApi) -> soapApi.add(5, 4))
        .thenAccept(response -> {

          // we can get the SOAP API response
          var sum = response.getResponse();
          // and also the intercepted messages!
          var messages = response.getMessages();

          var allXml = messages.stream().map(Main::getRawXml).collect(Collectors.joining("\n---\n"));

          System.out.println("sum: "   sum   ",\n"   allXml);
        });
  }

  public static String getRawXml(SOAPMessageContext context) {
    try {
      ByteArrayOutputStream byteOS = new ByteArrayOutputStream();
      context.getMessage().writeTo(byteOS);
      return byteOS.toString(StandardCharsets.UTF_8);
    } catch (SOAPException | IOException e) {
      throw new RuntimeException(e);
    }
  }
}

output

sum: 105,
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
  <soap:Body>
    <Add xmlns="http://tempuri.org/">
      <intA>73</intA>
      <intB>32</intB>
    </Add>
  </soap:Body>
</soap:Envelope>
---
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <soap:Header/>
  <soap:Body>
    <AddResponse xmlns="http://tempuri.org/">
      <AddResult>105</AddResult>
    </AddResponse>
  </soap:Body>
</soap:Envelope>

All Code

Here's a working example, complete with validation of the responses.

It creates a lot (REQUESTS_COUNT) of requests and submits them all to SoapApiManager.

Each request prints out the thread's name, and the hashcode of the JAX-WS Proxy (I wanted to check they were being reused), and the basic input/output (e.g. -9 - 99 = -108).

There's validation to make sure each SoapResponseHolder has the correct result and raw SOAP messages, and that the correct number of requests were sent.

Main.java


import com.github.underscore.lodash.Xml;
import com.github.underscore.lodash.Xml.XmlStringBuilder.Step;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.xml.soap.SOAPException;
import javax.xml.ws.handler.soap.SOAPMessageContext;

public class Main implements AutoCloseable {

  private final SoapApiManager apiManager = new SoapApiManager();

  private static final int THREAD_COUNT = 4;
  private static final int REQUESTS_COUNT = 500;

  private final AtomicInteger i = new AtomicInteger();

  public static void main(String[] args)
      throws InterruptedException {
    try (var m = new Main()) {
      m.run();
    }
  }

  private void run() throws InterruptedException {

    var executor = Executors.newFixedThreadPool(THREAD_COUNT);

    var tasks = Stream.generate(() -> Map.entry(randomInt(), randomInt()))
        .limit(REQUESTS_COUNT)
        .map(intA -> (Callable<Boolean>) () -> {
          sendAndValidateRequest(intA.getKey(), intA.getValue());
          i.incrementAndGet();
          return true;
        })
        .collect(Collectors.toList());

    executor.invokeAll(tasks);

    var waiter = Executors.newSingleThreadScheduledExecutor();
    waiter.scheduleWithFixedDelay(
        () -> {
          var size = i.get();
          System.out.println(">waiting... (size "   size   ")");
          if (size >= REQUESTS_COUNT) {
            System.out.println(">finished waiting! "   size);
            waiter.shutdownNow();
          }
        },
        3, 3, TimeUnit.SECONDS
    );

    System.out.println("Finished sending tasks "   waiter.awaitTermination(10, TimeUnit.SECONDS));
    waiter.shutdownNow();
    Thread.sleep(TimeUnit.SECONDS.toMillis(5));
    executor.shutdown();
    System.out.println(
        "executor.awaitTermination "   executor.awaitTermination(10, TimeUnit.SECONDS));
    if (!executor.isTerminated()) {
      System.out.println("executor.shutdownNow "   executor.shutdownNow());
    }

    if (i.get() != REQUESTS_COUNT) {
      throw new RuntimeException(
          "Test did not execute "   REQUESTS_COUNT   " times, actual: "   i.get()
      );
    }
  }

  private int randomInt() {
    return ThreadLocalRandom.current().nextInt(-100, 100);
  }

  private void sendAndValidateRequest(int a, int b) {

    apiManager
        .submitRequest((soapApi) -> {
          var response = soapApi.add(a, b);
          System.out.printf(
              "[%-12s / %-18s] M %s = = M\n",
              soapApi.hashCode(),
              Thread.currentThread().getName(),
              a,
              (b >= 0 ? " " : "-"),
              Math.abs(b),
              response
          );
          return response;
        })
        .thenAcceptAsync(response -> {

          var sum = response.getResponse();
          var messages = response.getMessages();

          var allXml = messages.stream().map(Main::getRawXml)
              .collect(Collectors.joining("\n---\n"));

          if (sum != a   b) {
            throw new RuntimeException(
                "Bad sum, sent "   a   "   "   b   ", result: "   sum   ", xml: "   allXml
            );
          }

          if (messages.size() != 2) {
            throw new RuntimeException(
                "Bad messages, expected 1 request and 1 response, but got "   messages.size()
                      ", xml: "   allXml
            );
          }

          if (!allXml.contains("<AddResult>"   (a   b)   "</AddResult>")) {
            throw new RuntimeException(
                "Bad result, did not contain AddResult="   (a   b)   ", actual: "   allXml
            );
          }

        });
  }

  public static String getRawXml(SOAPMessageContext context) {
    try (var byteOS = new ByteArrayOutputStream()) {
      context.getMessage().writeTo(byteOS);
      var rawSoap = byteOS.toString(StandardCharsets.UTF_8);

      return Xml.formatXml(rawSoap, Step.TWO_SPACES);

    } catch (SOAPException | IOException e) {
      throw new RuntimeException(e);
    }
  }

  @Override
  public void close() {
    apiManager.close();
  }
}

SoapApiManager.java


import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.function.Supplier;
import javax.xml.ws.handler.soap.SOAPMessageContext;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.tempuri.CalculatorSoap;

public class SoapApiManager implements AutoCloseable {

  private static final int THREAD_LIMIT = Math.min(Runtime.getRuntime().availableProcessors(), 5);

  private static final ExecutorService executorService = Executors.newWorkStealingPool(THREAD_LIMIT);

  private static final ThreadLocal<SoapApiWrapper> soapApiWrapper = ThreadLocal.withInitial(SoapApiWrapper::new);

  @Override
  public void close() {
    executorService.shutdown();
  }

  private static class SoapApiWrapper {

    private final CalculatorSoap connection;
    private final SoapMessageHandler soapMessageHandler = new SoapMessageHandler();

    public SoapApiWrapper() {
      var factoryBean = new JaxWsProxyFactoryBean();
      factoryBean.setAddress("http://www.dneonline.com/calculator.asmx");
      factoryBean.setServiceClass(CalculatorSoap.class);
      factoryBean.setHandlers(Collections.singletonList(soapMessageHandler));

      connection = factoryBean.create(CalculatorSoap.class);
    }
  }

  public <ResponseT> CompletableFuture<SoapResponseHolder<ResponseT>> submitRequest(
      SoapRequestRunner<ResponseT> requestRunner
  ) {
    return CompletableFuture.supplyAsync(createRequestCall(requestRunner), executorService);
  }

  private <ResponseT> Supplier<SoapResponseHolder<ResponseT>> createRequestCall(
      SoapRequestRunner<ResponseT> requestRunner
  ) {
    return () -> {
      SoapApiWrapper api = soapApiWrapper.get();

      var response = requestRunner.sendRequest(api.connection);
      var messages = api.soapMessageHandler.collectMessages();

      return new SoapResponseHolder<>(response, messages);
    };
  }

  @FunctionalInterface
  interface SoapRequestRunner<ResponseT> {

    ResponseT sendRequest(CalculatorSoap calculatorSoap);
  }

  public static class SoapResponseHolder<ResponseT> {

    private final List<SOAPMessageContext> messages;
    private final ResponseT response;

    SoapResponseHolder(
        ResponseT response,
        List<SOAPMessageContext> messages
    ) {
      this.response = response;
      this.messages = messages;
    }

    public ResponseT getResponse() {
      return response;
    }

    public List<SOAPMessageContext> getMessages() {
      return messages;
    }
  }

}

SoapMessageHandler.java

package org.example;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import javax.xml.namespace.QName;
import javax.xml.ws.handler.MessageContext;
import javax.xml.ws.handler.soap.SOAPHandler;
import javax.xml.ws.handler.soap.SOAPMessageContext;

public class SoapMessageHandler implements SOAPHandler<SOAPMessageContext> {

  private final List<SOAPMessageContext> messages = new ArrayList<>();

  public List<SOAPMessageContext> collectMessages() {
    var m = new ArrayList<>(messages);
    messages.clear();
    return m;
  }

  @Override
  public Set<QName> getHeaders() {
    return Collections.emptySet();
  }

  @Override
  public boolean handleMessage(SOAPMessageContext context) {
    messages.add(context);
    return true;
  }

  @Override
  public boolean handleFault(SOAPMessageContext context) {
    messages.add(context);
    return true;
  }

  @Override
  public void close(MessageContext context) {
  }
}

build.gradle.kts

plugins {
  java
  id("com.github.bjornvester.wsdl2java") version "1.2"
}

group = "org.example"
version = "1.0-SNAPSHOT"

repositories {
  mavenCentral()
}

dependencies {

  implementation(enforcedPlatform("org.apache.cxf:cxf-bom:3.4.4"))
  implementation("org.apache.cxf:cxf-core")
  implementation("org.apache.cxf:cxf-rt-frontend-jaxws")
  implementation("org.apache.cxf:cxf-rt-transports-http")
  implementation("org.apache.cxf:cxf-rt-databinding-jaxb")
//  implementation("org.apache.cxf:cxf-rt-transports-http-jetty")

  implementation("org.apache.cxf:cxf-rt-transports-http-hc")
  implementation("com.sun.activation:javax.activation:1.2.0")
  implementation("javax.annotation:javax.annotation-api:1.3.2")
  implementation("com.sun.xml.messaging.saaj:saaj-impl:1.5.1")

  implementation("com.github.javadev:underscore:1.68")

  //<editor-fold desc="JAXB">
  implementation("org.jvnet.jaxb2_commons:jaxb2-basics-runtime:1.11.1")
  xjcPlugins("org.jvnet.jaxb2_commons:jaxb2-basics:1.11.1")
  //</editor-fold>


  //<editor-fold desc="Test">
  testImplementation(enforcedPlatform("org.junit:junit-bom:5.7.2")) // JUnit 5 BOM
  testImplementation("org.junit.jupiter:junit-jupiter")

  testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.0")
  testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine")
  //</editor-fold>
}

java {
  toolchain {
    languageVersion.set(JavaLanguageVersion.of(11))
  }
}

wsdl2java {
  cxfVersion.set("3.4.4")
  options.addAll("-xjc-Xequals", "-xjc-XhashCode")
}

tasks.test {
  useJUnitPlatform()
}

  • Related