Home > OS >  How to test EmbeddedKafka with SpringBoot
How to test EmbeddedKafka with SpringBoot

Time:11-18

I met problem with testing Kafka Producer after change custom Producer to KafkaTemplate.

For tests reason I wrote next class:

public class KafkaTestingTools {

    static private Map<String, Consumer<Long, GenericData.Record>> consumers = new HashMap<>();

    static public void sendMessage (String topic, String key, Object message, Schema schema) throws InterruptedException{
        Properties properties = new Properties();
        properties.put("schema.registry.url", "http://localhost:8081");
        properties.put("bootstrap.servers", "http://localhost:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("linger.ms", 1);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "com.logistics.mock.CustomKafkaAvroSerializer");
        KafkaProducer<String, Object> producer = new KafkaProducer<>(properties);

        CustomKafkaAvroDeserializer.setTopicScheme(topic, schema);

        ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, message);
        producer.send(record);
        producer.close();
    }

    static public void registerConsumerContainer(EmbeddedKafkaBroker embeddedKafka, String topic, Schema schema) throws InterruptedException{
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup"   UUID.randomUUID().toString(), "true", embeddedKafka);
        consumerProps.put("schema.registry.url", "http://localhost:8081");
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "com.logistics.mock.CustomKafkaAvroDeserializer");
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        ConsumerFactory<Long, GenericData.Record> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<Long, GenericData.Record> consumer = cf.createConsumer();
        consumers.put(topic, consumer);

        embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic);

        CustomKafkaAvroDeserializer.setTopicScheme(topic, schema);
    }

    static public Object getSingleRecordFromRegisteredContainer(EmbeddedKafkaBroker embeddedKafka, String topic){
        return SpecificData.get().deepCopy(
            CustomKafkaAvroDeserializer.getTopicScheme(topic),
            KafkaTestUtils.getSingleRecord(consumers.get(topic), topic).value()
        );
    }
    
}

Producer example:

@Service
@CommonsLog
public class PointProducer {

    private final KafkaTemplate<String, ExportMessage> kafkaTemplate;
    private final String topic;

    @Autowired
    public PointProducer(@Value("${kafka.producer.points}") String topic,
            KafkaTemplate<String, ExportMessage> kafkaTemplate) {
        this.topic = topic;
        this.kafkaTemplate = kafkaTemplate;
    }

    public void produce(Point point) {
        var message = new ExportMessage();
        message.setId(point.getId());
        log.warn("produce point: "   message.toString());
        kafkaTemplate.send(topic, point.getId().toString(), message);
        kafkaTemplate.flush();
    }

kafka config

spring:
  kafka:
    bootstrap-servers: ${spring.embedded.kafka.brokers}
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      point-deserializer: com.logistics.mock.CustomKafkaAvroDeserializer
      auto-offset-reset: latest
      group-id: credit_file_test
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: com.logistics.mock.CustomKafkaAvroSerializer
    schema-registry-url: http://localhost:8081

kafka.consumer.points: points_export
kafka.producer.files: common.file
kafka.producer.orders: common.order
kafka.producer.points: common.point

And tests looks like:

@SpringBootTest
@TestMethodOrder(OrderAnnotation.class)
@EmbeddedKafka(partitions = 1, topics = { "topic1", "topic2" }, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class ApplicationLogisticOrderTest {
@Test
    @Order(1)
    @WithMockUser(roles = "ADMIN")
    void checkSendToKafka() throws Exception {
        KafkaTestingTools.registerConsumerContainer(this.embeddedKafka, TOPIC1, Message.SCHEMA$);
        Thread.sleep(3000);
        prepareCustomizedLogisticOrder(t -> {
        });
        var mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).build();
        mockMvc.perform(MockMvcRequestBuilders.put("/orders/7000000/sendToKafka"));

}

And on line with perform I caught:

Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:478)
    at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:468)
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.<init>(AbstractKafkaSchemaSerDeConfig.java:177)
    at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:50)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:376)

I tried to put it in application.yml, in KafkaTestingTools properties, but nothing changed, it looks like Spring looks for this property in another place.

Maybe someone met this situation and know solution?

Thanks in advance.

CodePudding user response:

The problem is here:

spring: kafka: schema-registry-url: http://localhost:8081

There is no such a property managed by Spring Boot. More over this schema-registry-url doesn't fit to that schema.registry.url.

You have to consider to change it into this:

spring:
  kafka:
    producer:
      properties:
        "schema.registry.url": http://localhost:8081

See docs for more info: https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.messaging.kafka.additional-properties

  • Related