I am currently generating async client code using wsdl-to-java which is being used to query a SOAP web service. Here is a snippet of the generated async method:
@WebMethod(operationName = "GetSession")
public Future<?> getSessionAsync(
@WebParam(partName = "parameters", name = "GetSessionRequest")
mynamespace.GetSessionRequest parameters,
@WebParam(partName = "ResponseHeader", mode = WebParam.Mode.OUT, name = "ResponseHeader", header = true)
javax.xml.ws.Holder<mydatacontract.ResponseHeader> responseHeader,
@WebParam(name = "asyncHandler", targetNamespace = "")
AsyncHandler<myservice.GetSessionResponse> asyncHandler
);
I am calling the above generated code in a wrapper class:
getSession(GetSessionRequest request) {
Future<?> response = generatedClient.getSessionAsync(request, responseHeader, handler)
}
handler(Response<GetSessionResponse> response) {
// no access to SOAP XML at this point?
}
As per my understanding, the generated code takes care of serialization/de-serialization and I do not have access to the raw SOAP response. There are ways to log the SOAP XML response as described here but I need to access this in code as the response needs to be dumped into the database.
Is there any way to access this in the handler without touching the generated client code?
UPDATE:
I am able to read the ResponseContext
which is of type java.util.Map<String, Object>
. But this does not return the raw SOAP XML that I am looking for.
Also, using an inbound Interceptor
would mean that I lose the context of the calling function. This is required to store the XML response associated with each call in the database.
UPDATE 2:
The Future returns an object of type Response
which can be found at Response.java. The jdoc states the following:
The interface provides methods used to obtain the
payload and context of a message sent in response to an operation
invocation.
However, I am only able to retrieve the Context
and no property to access the payload.
I found an SO answer that has a solution for Axis here. Is it possible to have something similar in cxf?
CodePudding user response:
Take a look at this code snippet (source : https://github.com/apache/cxf/blob/master/core/src/main/java/org/apache/cxf/interceptor/LoggingInInterceptor.java)
import org.apache.cxf.message.Message;
...
protected void logging(Logger logger, Message message) {
if (message.containsKey(LoggingMessage.ID_KEY)) {
return;
}
String id = (String)message.getExchange().get(LoggingMessage.ID_KEY);
if (id == null) {
id = LoggingMessage.nextId();
message.getExchange().put(LoggingMessage.ID_KEY, id);
}
message.put(LoggingMessage.ID_KEY, id);
final LoggingMessage buffer
= new LoggingMessage("Inbound Message\n----------------------------", id);
if (!Boolean.TRUE.equals(message.get(Message.DECOUPLED_CHANNEL_MESSAGE))) {
// avoid logging the default responseCode 200 for the decoupled responses
Integer responseCode = (Integer)message.get(Message.RESPONSE_CODE);
if (responseCode != null) {
buffer.getResponseCode().append(responseCode);
}
}
String encoding = (String)message.get(Message.ENCODING);
if (encoding != null) {
buffer.getEncoding().append(encoding);
}
String httpMethod = (String)message.get(Message.HTTP_REQUEST_METHOD);
if (httpMethod != null) {
buffer.getHttpMethod().append(httpMethod);
}
String ct = (String)message.get(Message.CONTENT_TYPE);
if (ct != null) {
buffer.getContentType().append(ct);
}
Object headers = message.get(Message.PROTOCOL_HEADERS);
if (headers != null) {
buffer.getHeader().append(headers);
}
String uri = (String)message.get(Message.REQUEST_URL);
if (uri == null) {
String address = (String)message.get(Message.ENDPOINT_ADDRESS);
uri = (String)message.get(Message.REQUEST_URI);
if (uri != null && uri.startsWith("/")) {
if (address != null && !address.startsWith(uri)) {
if (address.endsWith("/") && address.length() > 1) {
address = address.substring(0, address.length() - 1);
}
uri = address uri;
}
} else {
uri = address;
}
}
if (uri != null) {
buffer.getAddress().append(uri);
String query = (String)message.get(Message.QUERY_STRING);
if (query != null) {
buffer.getAddress().append('?').append(query);
}
}
if (!isShowBinaryContent() && isBinaryContent(ct)) {
buffer.getMessage().append(BINARY_CONTENT_MESSAGE).append('\n');
log(logger, buffer.toString());
return;
}
if (!isShowMultipartContent() && isMultipartContent(ct)) {
buffer.getMessage().append(MULTIPART_CONTENT_MESSAGE).append('\n');
log(logger, buffer.toString());
return;
}
//// IMPORTANT LINE
InputStream is = message.getContent(InputStream.class);
////
if (is != null) {
logInputStream(message, is, buffer, encoding, ct);
} else {
Reader reader = message.getContent(Reader.class);
if (reader != null) {
logReader(message, reader, buffer);
}
}
log(logger, formatLoggingMessage(buffer));
}
So it follows that you can create an interceptor and get the content of the message using org.apache.cxf.message.Message
CodePudding user response:
The problem: SOAP messages can be intercepted, but at a very separate location to the original call site. It's difficult to pass the SOAP message back to the original call site, particularly in multithreaded or asynchronous environments.
The only solution I can see is to explicitly only have one JAX-WS Proxy, which has one Handler, per request. Having only one Proxy for an application would be a bottleneck, so it requires using multithreading tools to allow for parallel and async execution.
Here's my idea, in code. First I go through it step-by-step, and at the end there's a dump of all code.
UPDATE: I've replaced the LinkedBlockingQueue
with a static ThreadLocal<SoapApiWrapper>
instance, and the executor with a newWorkStealingPool()
. See the edit history for the changes!
I've set it up to use http://www.dneonline.com/calculator.asmx. It compiles and runs, but I haven't spent a lot of time ensuring it works correctly or is optimal. I'm sure there are issues (my CPU fan is working hard, even though I'm not running code). Be warned!
(Does anyone know of a better public SOAP API that I can either run locally or flood with
requests?)
If you'd like to test, here are some public SOAP APIs:
https://documenter.getpostman.com/view/8854915/Szf26WHn
Step by step
Implement
SOAPHandler
class that will capture messages, calledSoapMessageHandler
.public class SoapMessageHandler implements SOAPHandler<SOAPMessageContext> { // capture messages in a list private final List<SOAPMessageContext> messages = new ArrayList<>(); // get & clear messages public List<SOAPMessageContext> collectMessages() { var m = new ArrayList<>(messages); messages.clear(); return m; } @Override public boolean handleMessage(SOAPMessageContext context) { messages.add(context); // collect message return true; } @Override public boolean handleFault(SOAPMessageContext context) { messages.add(context); // collect error return true; } }
Define a
SoapApiWrapper
class that- creates one
SoapMessageHandler
, - creates one JAX-WS Proxy,
- and adds the Handler to the Proxy.
class SoapApiWrapper { // 1. create a handler private final SoapMessageHandler soapMessageHandler = new SoapMessageHandler(); private final CalculatorSoap connection; public SoapApiWrapper() { // 2. create one connection var factoryBean = new JaxWsProxyFactoryBean(); factoryBean.setAddress("http://www.dneonline.com/calculator.asmx"); factoryBean.setServiceClass(CalculatorSoap.class); // 3. add the Handler factoryBean.setHandlers(Collections.singletonList(soapMessageHandler)); connection = factoryBean.create(CalculatorSoap.class); } }
- creates one
Define a
SoapApiManager
that has- an
ExecutorService
, which will manage the SOAP requests and responses - a
ThreadLocal<SoapApiWrapper>
, so each thread has an JAX-WS Proxy (idea from https://stackoverflow.com/a/16680215/4161471)
public class SoapApiManager { // 1. request executor private static final ExecutorService executorService = Executors.newWorkStealingPool(THREAD_LIMIT); private static final ThreadLocal<SoapApiWrapper> soapApiWrapper = ThreadLocal.withInitial(SoapApiWrapper::new); }
- an
The
SoapApiManager
has a method,submitRequest(...)
. It will return the SOAP API response ** and** the SOAP messages.public <ResponseT> CompletableFuture<SoapResponseHolder<ResponseT>> submitRequest( SoapRequestRunner<ResponseT> requestRunner ) { //... }
The parameter is a
SoapRequestRunner
, a lambda that accepts a JAX-WS Proxy and returns a SOAP Response.@FunctionalInterface interface SoapRequestRunner<ResponseT> { ResponseT sendRequest(CalculatorSoap calculatorSoap); }
When invoked,
submitRequest(...)
performs the following:- Wrap the
SoapRequestRunner
withCompleteableFuture.supplyAsync(...)
, and using ourExectutorService
- Fetches a
SoapApiWrapper
from theThreadLocal
, - calls the SOAP API (by applying the
SoapRequestRunner
to theSoapApiWrapper
's JAX-WS Proxy) - awaits the SOAP result,
- extracts the SOAP messages from the
SoapApiWrapper
'sSOAPHandler
, - and finally, bundles the SOAP result and SOAP messages in a DTO,
SoapResponseHolder
public <ResponseT> CompletableFuture<SoapResponseHolder<ResponseT>> submitRequest( SoapRequestRunner<ResponseT> requestRunner ) { // 1. use CompletableFuture & executorService return CompletableFuture.supplyAsync(createRequestCall(requestRunner), executorService); } private <ResponseT> Supplier<SoapResponseHolder<ResponseT>> createRequestCall( SoapRequestRunner<ResponseT> requestRunner ) { return () -> { SoapApiWrapper api = null; try { api = soapApiWrapperQueue.get(); // 2. fetch an API Wrapper var response = requestRunner.sendRequest(api.connection); // 3&4. request & response var messages = api.soapMessageHandler.collectMessages(); // 5. extract raw SOAP messages return new SoapResponseHolder<>(response, messages); // 6. bundle into DTO } catch (InterruptedException e) { throw new RuntimeException(e); } finally { if (api != null) { soapApiWrapperQueue.offer(api); } } }; }
- Wrap the
Example Usage
public class Main {
public static void main(String[] args) {
SoapApiManager apiManager = new SoapApiManager();
apiManager
.submitRequest((soapApi) -> soapApi.add(5, 4))
.thenAccept(response -> {
// we can get the SOAP API response
var sum = response.getResponse();
// and also the intercepted messages!
var messages = response.getMessages();
var allXml = messages.stream().map(Main::getRawXml).collect(Collectors.joining("\n---\n"));
System.out.println("sum: " sum ",\n" allXml);
});
}
public static String getRawXml(SOAPMessageContext context) {
try {
ByteArrayOutputStream byteOS = new ByteArrayOutputStream();
context.getMessage().writeTo(byteOS);
return byteOS.toString(StandardCharsets.UTF_8);
} catch (SOAPException | IOException e) {
throw new RuntimeException(e);
}
}
}
output
sum: 105,
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
<soap:Body>
<Add xmlns="http://tempuri.org/">
<intA>73</intA>
<intB>32</intB>
</Add>
</soap:Body>
</soap:Envelope>
---
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body>
<AddResponse xmlns="http://tempuri.org/">
<AddResult>105</AddResult>
</AddResponse>
</soap:Body>
</soap:Envelope>
All Code
Here's a working example, complete with validation of the responses.
It creates a lot (REQUESTS_COUNT
) of requests and submits them all to SoapApiManager
.
Each request prints out the thread's name, and the hashcode of the JAX-WS Proxy (I wanted to check
they were being reused), and the basic input/output (e.g. -9 - 99 = -108
).
There's validation to make sure each SoapResponseHolder
has the correct result and raw SOAP
messages, and that the correct number of requests were sent.
Main.java
import com.github.underscore.lodash.Xml;
import com.github.underscore.lodash.Xml.XmlStringBuilder.Step;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.xml.soap.SOAPException;
import javax.xml.ws.handler.soap.SOAPMessageContext;
public class Main implements AutoCloseable {
private final SoapApiManager apiManager = new SoapApiManager();
private static final int THREAD_COUNT = 4;
private static final int REQUESTS_COUNT = 500;
private final AtomicInteger i = new AtomicInteger();
public static void main(String[] args)
throws InterruptedException {
try (var m = new Main()) {
m.run();
}
}
private void run() throws InterruptedException {
var executor = Executors.newFixedThreadPool(THREAD_COUNT);
var tasks = Stream.generate(() -> Map.entry(randomInt(), randomInt()))
.limit(REQUESTS_COUNT)
.map(intA -> (Callable<Boolean>) () -> {
sendAndValidateRequest(intA.getKey(), intA.getValue());
i.incrementAndGet();
return true;
})
.collect(Collectors.toList());
executor.invokeAll(tasks);
var waiter = Executors.newSingleThreadScheduledExecutor();
waiter.scheduleWithFixedDelay(
() -> {
var size = i.get();
System.out.println(">waiting... (size " size ")");
if (size >= REQUESTS_COUNT) {
System.out.println(">finished waiting! " size);
waiter.shutdownNow();
}
},
3, 3, TimeUnit.SECONDS
);
System.out.println("Finished sending tasks " waiter.awaitTermination(10, TimeUnit.SECONDS));
waiter.shutdownNow();
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
executor.shutdown();
System.out.println(
"executor.awaitTermination " executor.awaitTermination(10, TimeUnit.SECONDS));
if (!executor.isTerminated()) {
System.out.println("executor.shutdownNow " executor.shutdownNow());
}
if (i.get() != REQUESTS_COUNT) {
throw new RuntimeException(
"Test did not execute " REQUESTS_COUNT " times, actual: " i.get()
);
}
}
private int randomInt() {
return ThreadLocalRandom.current().nextInt(-100, 100);
}
private void sendAndValidateRequest(int a, int b) {
apiManager
.submitRequest((soapApi) -> {
var response = soapApi.add(a, b);
System.out.printf(
"[%-12s / %-18s] M %s = = M\n",
soapApi.hashCode(),
Thread.currentThread().getName(),
a,
(b >= 0 ? " " : "-"),
Math.abs(b),
response
);
return response;
})
.thenAcceptAsync(response -> {
var sum = response.getResponse();
var messages = response.getMessages();
var allXml = messages.stream().map(Main::getRawXml)
.collect(Collectors.joining("\n---\n"));
if (sum != a b) {
throw new RuntimeException(
"Bad sum, sent " a " " b ", result: " sum ", xml: " allXml
);
}
if (messages.size() != 2) {
throw new RuntimeException(
"Bad messages, expected 1 request and 1 response, but got " messages.size()
", xml: " allXml
);
}
if (!allXml.contains("<AddResult>" (a b) "</AddResult>")) {
throw new RuntimeException(
"Bad result, did not contain AddResult=" (a b) ", actual: " allXml
);
}
});
}
public static String getRawXml(SOAPMessageContext context) {
try (var byteOS = new ByteArrayOutputStream()) {
context.getMessage().writeTo(byteOS);
var rawSoap = byteOS.toString(StandardCharsets.UTF_8);
return Xml.formatXml(rawSoap, Step.TWO_SPACES);
} catch (SOAPException | IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() {
apiManager.close();
}
}
SoapApiManager.java
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.function.Supplier;
import javax.xml.ws.handler.soap.SOAPMessageContext;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.tempuri.CalculatorSoap;
public class SoapApiManager implements AutoCloseable {
private static final int THREAD_LIMIT = Math.min(Runtime.getRuntime().availableProcessors(), 5);
private static final ExecutorService executorService = Executors.newWorkStealingPool(THREAD_LIMIT);
private static final ThreadLocal<SoapApiWrapper> soapApiWrapper = ThreadLocal.withInitial(SoapApiWrapper::new);
@Override
public void close() {
executorService.shutdown();
}
private static class SoapApiWrapper {
private final CalculatorSoap connection;
private final SoapMessageHandler soapMessageHandler = new SoapMessageHandler();
public SoapApiWrapper() {
var factoryBean = new JaxWsProxyFactoryBean();
factoryBean.setAddress("http://www.dneonline.com/calculator.asmx");
factoryBean.setServiceClass(CalculatorSoap.class);
factoryBean.setHandlers(Collections.singletonList(soapMessageHandler));
connection = factoryBean.create(CalculatorSoap.class);
}
}
public <ResponseT> CompletableFuture<SoapResponseHolder<ResponseT>> submitRequest(
SoapRequestRunner<ResponseT> requestRunner
) {
return CompletableFuture.supplyAsync(createRequestCall(requestRunner), executorService);
}
private <ResponseT> Supplier<SoapResponseHolder<ResponseT>> createRequestCall(
SoapRequestRunner<ResponseT> requestRunner
) {
return () -> {
SoapApiWrapper api = soapApiWrapper.get();
var response = requestRunner.sendRequest(api.connection);
var messages = api.soapMessageHandler.collectMessages();
return new SoapResponseHolder<>(response, messages);
};
}
@FunctionalInterface
interface SoapRequestRunner<ResponseT> {
ResponseT sendRequest(CalculatorSoap calculatorSoap);
}
public static class SoapResponseHolder<ResponseT> {
private final List<SOAPMessageContext> messages;
private final ResponseT response;
SoapResponseHolder(
ResponseT response,
List<SOAPMessageContext> messages
) {
this.response = response;
this.messages = messages;
}
public ResponseT getResponse() {
return response;
}
public List<SOAPMessageContext> getMessages() {
return messages;
}
}
}
SoapMessageHandler.java
package org.example;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import javax.xml.namespace.QName;
import javax.xml.ws.handler.MessageContext;
import javax.xml.ws.handler.soap.SOAPHandler;
import javax.xml.ws.handler.soap.SOAPMessageContext;
public class SoapMessageHandler implements SOAPHandler<SOAPMessageContext> {
private final List<SOAPMessageContext> messages = new ArrayList<>();
public List<SOAPMessageContext> collectMessages() {
var m = new ArrayList<>(messages);
messages.clear();
return m;
}
@Override
public Set<QName> getHeaders() {
return Collections.emptySet();
}
@Override
public boolean handleMessage(SOAPMessageContext context) {
messages.add(context);
return true;
}
@Override
public boolean handleFault(SOAPMessageContext context) {
messages.add(context);
return true;
}
@Override
public void close(MessageContext context) {
}
}
build.gradle.kts
plugins {
java
id("com.github.bjornvester.wsdl2java") version "1.2"
}
group = "org.example"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
implementation(enforcedPlatform("org.apache.cxf:cxf-bom:3.4.4"))
implementation("org.apache.cxf:cxf-core")
implementation("org.apache.cxf:cxf-rt-frontend-jaxws")
implementation("org.apache.cxf:cxf-rt-transports-http")
implementation("org.apache.cxf:cxf-rt-databinding-jaxb")
// implementation("org.apache.cxf:cxf-rt-transports-http-jetty")
implementation("org.apache.cxf:cxf-rt-transports-http-hc")
implementation("com.sun.activation:javax.activation:1.2.0")
implementation("javax.annotation:javax.annotation-api:1.3.2")
implementation("com.sun.xml.messaging.saaj:saaj-impl:1.5.1")
implementation("com.github.javadev:underscore:1.68")
//<editor-fold desc="JAXB">
implementation("org.jvnet.jaxb2_commons:jaxb2-basics-runtime:1.11.1")
xjcPlugins("org.jvnet.jaxb2_commons:jaxb2-basics:1.11.1")
//</editor-fold>
//<editor-fold desc="Test">
testImplementation(enforcedPlatform("org.junit:junit-bom:5.7.2")) // JUnit 5 BOM
testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.0")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine")
//</editor-fold>
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(11))
}
}
wsdl2java {
cxfVersion.set("3.4.4")
options.addAll("-xjc-Xequals", "-xjc-XhashCode")
}
tasks.test {
useJUnitPlatform()
}