I all, I have a topic in which a service write this message :
{"version":"1.0.0","action":"CREATE_BACKUP","actionId":"58048","backupManagerId":"config_data","status":"STARTED"}
from the log of producer I see:
{"version":"1.1.0","timestamp":"2021-10-28T18:30:16.039 00:00","severity":"info","service_id":"bro",
"message":"Sent notification: Notification [version=1.0.0, action=RESTORE, actionId=31247, backupManagerId=test_data, status=COMPLETED]",
"extra_data":{"location":{"class":"com.test.mgmt.backupandrestore.notification.KafkaNotifier"}}}
I have a consumer service that receives this error when a message is produced:
{"version": "1.0.0", "timestamp": "2021-10-28T20:19:06.265 0000", "severity": "error", "service_id": "eric-cnels-license-front-end", "message":
"[email protected]::error:149@Consumer exception@ java.lang.IllegalStateException:
This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer'
in the value and/or key deserializer// at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194)
~[spring-kafka-2.7.6.jar:2.7.6]// at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112)
~[spring-kafka-2.7.6.jar:2.7.6]//
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1602)
[spring-kafka-2.7.6.jar:2.7.6]// at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210)
[spring-kafka-2.7.6.jar:2.7.6]// at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_302]//
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_302]//
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]//
Caused by: org.apache.kafka.common.errors.SerializationException:
Error deserializing key/value for partition bro-notification-0 at offset 6. If needed, please seek past the record to continue consumption.
//Caused by: java.lang.IllegalArgumentException:
The class 'com.test.mgmt.backupandrestore.notification.Notification' is not in the trusted packages:
[java.util, java.lang, com.test.server.configuration.messagebus, com.test.server.configuration.messagebus.*].
If you believe this class is safe to deserialize, please provide its name.
If the serialization is only done by a trusted source, you can also enable trust all (*).//
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129)
~[spring-kafka-2.7.6.jar:2.7.6]// at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103)
~[spring-kafka-2.7.6.jar:2.7.6]// at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:521)
~[spring-kafka-2.7.6.jar:2.7.6]// at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1387) ~[kafka-clients-2.7.1.jar:?]//
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:133) ~[kafka-clients-2.7.1.jar:?]//
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1618) ~[kafka-clients-2.7.1.jar:?]//
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1454) ~[kafka-clients-2.7.1.jar:?]//
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:687) ~[kafka-clients-2.7.1.jar:?]//
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:638) ~[kafka-clients-2.7.1.jar:?]//
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272) ~[kafka-clients-2.7.1.jar:?]//
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) ~[kafka-clients-2.7.1.jar:?]//
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) ~[kafka-clients-2.7.1.jar:?]//
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1412)
~[spring-kafka-2.7.6.jar:2.7.6]//
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1249)
~[spring-kafka-2.7.6.jar:2.7.6]//
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161)
~[spring-kafka-2.7.6.jar:2.7.6]// ... 3 more//@"}
The consumer config is:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);
@Bean
public ConsumerFactory<String, BroMessageEvent> broConsumerFactory(){
String bootstrapAddress = SystemConfiguration.MESSAGE_BUS_BOOTSTRAP_SERVERS;
String groupId = SystemConfiguration.MESSAGE_BUS_GROUP; // this must be random
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 300000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES,"*");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(BroMessageEvent.class));
}
@Bean(name = "broListnerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, BroMessageEvent> broListnerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, BroMessageEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(broConsumerFactory());
return factory;
}
@Bean
public KafkaEventListener kafkaEventListener() {
//This is handled by Spring and will allocate it along with the springcontext in the main
return new KafkaEventListener();
}
}
and the BroMessageEvent is:
package com.ericsson.licensing.nels.server.configuration.messagebus;
public class BroMessageEvent{
private String version;
private String action;
private String actionId;
private String backupManagerId;
private String status;
public BroMessageEvent(){
this.version = null;
this.action = null;
this.actionId = null;
this.backupManagerId = null;
this.status = null;
}
public BroMessageEvent(String versionIn, String actionIn, String actionIdIn, String backupManagerIdIn, String statusIn){
this.version = versionIn;
this.action = actionIn;
this.actionId = actionIdIn;
this.backupManagerId = backupManagerIdIn;
this.status = statusIn;
}
public String getVersion(){return this.version;}
public void setVersion(String verisionIn){this.version=verisionIn;}
public String getAction(){return this.action;}
public void setAction(String actionIn){this.action=actionIn;}
public String getActionId(){return this.actionId;}
public void setActionId(String actionIdIn){this.actionId=actionIdIn;}
public String getBackupManagerId(){return this.backupManagerId;}
public void setBackupManagerId(String backupManagerIdIn){this.backupManagerId=backupManagerIdIn;}
public String getStatus(){return this.status;}
public void setStatus(String statusIn){this.status=statusIn;}
public String toString(){
return "Version: " version ", Action: " action ", BackupManagerId: " backupManagerId ", Status: " status;
}
}
Is it possible that some configuration is wrong, maybe JsonDeserializer class mapping? is it possible that "extra_data" could bring me problems? linking to kafka and listing the messages on the topic I only see messages formatted like the one above.
what could be causing the error? suggestions?
CodePudding user response:
you can solve this issue by replacing the value of TRUSTED_PACKEGES * with . Also, use a StringJsonMessageConverter with a StringDeserializer or ByteArrayDeserializer instead of a JsonDeserializer.
props.put(JsonDeserializer.TRUSTED_PACKAGES,"<path-to-your-package>");
CodePudding user response:
I did as follow for my key and value deserialization. I set the ErrorHandlingDeserializer as key and value serializer for Kafka consumer factory. Then I added ErrorHandlingDeserializer's value deserializer as JsonDeserializer. and key deserializer as String class
@Bean
ConsumerFactory<String, BroMessageEvent> customConsumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
configs.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
configs.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
configs.put(JsonDeserializer.VALUE_DEFAULT_TYPE, //package name of BroMessageEvent class);
configs.put(JsonDeserializer.TRUSTED_PACKAGES, PACKAGE_NAME);
return new DefaultKafkaConsumerFactory<>(configs);
}