I am trying to publish a message to Google Cloud PubSub using the library:
implementation "com.google.cloud:spring-cloud-gcp-starter-pubsub:2.0.0"
.
But, I am getting the below exception:
com.google.cloud.spring.pubsub.support.converter.PubSubMessageConversionException: Unable to convert payload of type com.logistics.domain.events.items.FOItemsReturnedTrackingUpdate to byte[] for sending to Pub/Sub.
Here is the full stack trace:
com.google.cloud.spring.pubsub.support.converter.PubSubMessageConversionException: Unable to convert payload of type com.logistics.domain.events.items.FOItemsReturnedTrackingUpdate to byte[] for sending to Pub/Sub.
at com.google.cloud.spring.pubsub.support.converter.SimplePubSubMessageConverter.toPubSubMessage(SimplePubSubMessageConverter.java:63)
at com.google.cloud.spring.pubsub.core.publisher.PubSubPublisherTemplate.publish(PubSubPublisherTemplate.java:86)
at com.google.cloud.spring.pubsub.core.PubSubTemplate.publish(PubSubTemplate.java:128)
at com.logistics.adapters.publishers.FOItemsReturnedTrackingUpdateEventPublisherPubSubAdapter.publishItemsReturnedTrackingUpdate(FOItemsReturnedTrackingUpdateEventPublisherPubSubAdapter.java:46)
at com.logistics.application.services.FOPackagesService.lambda$processFoPackageAnnulled$2(FOPackagesService.java:100)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at com.logistics.application.services.FOPackagesService.processFoPackageAnnulled(FOPackagesService.java:71)
at com.logistics.adapters.controllers.DummyController.foPackagesAnnulled(DummyController.java:20)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1067)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:681)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:764)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.AbstractRequestLoggingFilter.doFilterInternal(AbstractRequestLoggingFilter.java:289)
at com.logistic.common.model.CustomRequestLoggingFilter.doFilterInternal(CustomRequestLoggingFilter.java:48)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:96)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:540)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
at org.apache.catalina.valves.RemoteIpValve.invoke(RemoteIpValve.java:769)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:359)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:399)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:889)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1735)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)
at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.base/java.lang.Thread.run(Thread.java:832)
Below is the code for publishing the event:
@Slf4j
@Component
public class ItemsReturnedTrackingUpdateEventPublisherPubSubAdapter implements
ItemsReturnedTrackingUpdateEventPublisher {
private final String topicName;
private final PubSubTemplate pubSubTemplate;
private final ObjectWriter objectWriter;
public ItemsReturnedTrackingUpdateEventPublisherPubSubAdapter(String topicName,
PubSubTemplate pubSubTemplate,
ObjectWriter objectWriter) {
this.topicName = topicName;
this.pubSubTemplate = pubSubTemplate;
this.objectWriter = objectWriter;
}
@Override
public void publishItemsReturnedTrackingUpdate(ItemsReturnedTrackingUpdate data) {
try {
Map<String, String> attributes = MDC.getCopyOfContextMap();
attributes.put("eventType", EventType.ITEMS_RETURNED_TRACKING_UPDATE.name());
var message = objectWriter.writeValueAsString(data);
log.debug("ItemsReturnedTrackingUpdate event is being published on topic : {} with attributes : {} and data : {}",
topicName, attributes, message);
ListenableFuture<String> result = pubSubTemplate.publish(topicName, data);
log.debug("ItemsReturnedTrackingUpdate published: {}, Message id: {}", data.getOrderId(), result.get());
} catch (Exception e) {
log.error("Error publishing ItemsReturnedTrackingUpdate with id: {}", data.getOrderId());
e.printStackTrace();
throw new PublisherException("Error publishing ItemsReturnedTrackingUpdate with id: " data.getOrderId(), e);
}
}
}
CodePudding user response:
You need to have a pubsub Message converter like below
@Bean
@Primary
public PubSubMessageConverter pubSubMessageConverter(ObjectMapper objectMapper) {
return new JacksonPubSubMessageConverter(objectMapper);
}
Also attaching sample example for object mapper bean
@Bean
@Primary
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
return mapper;
}
CodePudding user response:
Looking at the stack trace, SimplePubSubMessageConverter
is choking on the Object being published. Looking at the doc for that class:
... maps payloads of type byte[], ByteString, ByteBuffer, and String to Pub/Sub messages
Change:
pubSubTemplate.publish(topicName, data);
To:
pubSubTemplate.publish(topicName, message);