Home > OS >  Model class "is not in the trusted packages"
Model class "is not in the trusted packages"

Time:10-31

I all, I have a topic in which a service write this message :

{"version":"1.0.0","action":"CREATE_BACKUP","actionId":"58048","backupManagerId":"config_data","status":"STARTED"} 

from the log of producer I see:

{"version":"1.1.0","timestamp":"2021-10-28T18:30:16.039 00:00","severity":"info","service_id":"bro",
 "message":"Sent notification: Notification [version=1.0.0, action=RESTORE, actionId=31247, backupManagerId=test_data, status=COMPLETED]",
 "extra_data":{"location":{"class":"com.test.mgmt.backupandrestore.notification.KafkaNotifier"}}}

I have a consumer service that receives this error when a message is produced:

{"version": "1.0.0", "timestamp": "2021-10-28T20:19:06.265 0000", "severity": "error", "service_id": "eric-cnels-license-front-end", "message": 
"[email protected]::error:149@Consumer exception@ java.lang.IllegalStateException: 
This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' 
in the value and/or key deserializer//  at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194)
 ~[spring-kafka-2.7.6.jar:2.7.6]//  at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) 
 ~[spring-kafka-2.7.6.jar:2.7.6]//  
 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1602) 
 [spring-kafka-2.7.6.jar:2.7.6]//   at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210)
 [spring-kafka-2.7.6.jar:2.7.6]//   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_302]//  
 at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_302]//    
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]//
 Caused by: org.apache.kafka.common.errors.SerializationException:
 Error deserializing key/value for partition bro-notification-0 at offset 6. If needed, please seek past the record to continue consumption.
 //Caused by: java.lang.IllegalArgumentException: 
 The class 'com.test.mgmt.backupandrestore.notification.Notification' is not in the trusted packages: 
 [java.util, java.lang, com.test.server.configuration.messagebus, com.test.server.configuration.messagebus.*]. 
 If you believe this class is safe to deserialize, please provide its name. 
 If the serialization is only done by a trusted source, you can also enable trust all (*).//    
 at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129) 
 ~[spring-kafka-2.7.6.jar:2.7.6]//  at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103) 
 ~[spring-kafka-2.7.6.jar:2.7.6]//  at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:521) 
 ~[spring-kafka-2.7.6.jar:2.7.6]//  at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1387) ~[kafka-clients-2.7.1.jar:?]//    
 at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:133) ~[kafka-clients-2.7.1.jar:?]//    
 at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1618) ~[kafka-clients-2.7.1.jar:?]//   
 at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1454) ~[kafka-clients-2.7.1.jar:?]//    
 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:687) ~[kafka-clients-2.7.1.jar:?]//   
 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:638) ~[kafka-clients-2.7.1.jar:?]// 
 at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272) ~[kafka-clients-2.7.1.jar:?]//  
 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) ~[kafka-clients-2.7.1.jar:?]//    
 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) ~[kafka-clients-2.7.1.jar:?]//    
 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1412) 
 ~[spring-kafka-2.7.6.jar:2.7.6]//  
 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1249) 
 ~[spring-kafka-2.7.6.jar:2.7.6]//  
 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) 
 ~[spring-kafka-2.7.6.jar:2.7.6]//  ... 3 more//@"}

The consumer config is:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

      private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);


    @Bean
    public ConsumerFactory<String, BroMessageEvent> broConsumerFactory(){
        String bootstrapAddress = SystemConfiguration.MESSAGE_BUS_BOOTSTRAP_SERVERS;
        String groupId = SystemConfiguration.MESSAGE_BUS_GROUP; // this must be random
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 300000);

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES,"*");
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(BroMessageEvent.class));
    }

    @Bean(name = "broListnerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, BroMessageEvent> broListnerContainerFactory(){
      ConcurrentKafkaListenerContainerFactory<String, BroMessageEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(broConsumerFactory());
      return factory;
  }

    @Bean
    public KafkaEventListener kafkaEventListener() {
        //This is handled by Spring and will allocate it along with the springcontext in the main
        return new KafkaEventListener();
    }

}

and the BroMessageEvent is:

package com.ericsson.licensing.nels.server.configuration.messagebus;

public class BroMessageEvent{
    private String version;
    private String action;
    private String actionId;
    private String backupManagerId;
    private String status;

    public BroMessageEvent(){
        this.version = null;
        this.action = null;
        this.actionId = null;
        this.backupManagerId = null;
        this.status = null;
    }

    public BroMessageEvent(String versionIn, String actionIn, String actionIdIn, String backupManagerIdIn, String statusIn){
        this.version = versionIn;
        this.action = actionIn;
        this.actionId = actionIdIn;
        this.backupManagerId = backupManagerIdIn;
        this.status = statusIn;
    }

    public String getVersion(){return this.version;}
    public void setVersion(String verisionIn){this.version=verisionIn;}
    public String getAction(){return this.action;}
    public void setAction(String actionIn){this.action=actionIn;}
    public String getActionId(){return this.actionId;}
    public void setActionId(String actionIdIn){this.actionId=actionIdIn;}
    public String getBackupManagerId(){return this.backupManagerId;}
    public void setBackupManagerId(String backupManagerIdIn){this.backupManagerId=backupManagerIdIn;}
    public String getStatus(){return this.status;}
    public void setStatus(String statusIn){this.status=statusIn;}
    public String toString(){
        return "Version: "   version   ", Action: "   action  ", BackupManagerId: "   backupManagerId   ", Status: "  status;
    }
}

Is it possible that some configuration is wrong, maybe JsonDeserializer class mapping? is it possible that "extra_data" could bring me problems? linking to kafka and listing the messages on the topic I only see messages formatted like the one above.
what could be causing the error? suggestions?

CodePudding user response:

you can solve this issue by replacing the value of TRUSTED_PACKEGES * with . Also, use a StringJsonMessageConverter with a StringDeserializer or ByteArrayDeserializer instead of a JsonDeserializer.

props.put(JsonDeserializer.TRUSTED_PACKAGES,"<path-to-your-package>");

see this answer1, Answer2.

CodePudding user response:

I did as follow for my key and value deserialization. I set the ErrorHandlingDeserializer as key and value serializer for Kafka consumer factory. Then I added ErrorHandlingDeserializer's value deserializer as JsonDeserializer. and key deserializer as String class

  @Bean
    ConsumerFactory<String, BroMessageEvent> customConsumerFactory() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        configs.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        configs.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        configs.put(JsonDeserializer.VALUE_DEFAULT_TYPE, //package name of BroMessageEvent class);
        configs.put(JsonDeserializer.TRUSTED_PACKAGES, PACKAGE_NAME);
        return new DefaultKafkaConsumerFactory<>(configs);
    }

  • Related