I met problem with testing Kafka Producer after change custom Producer to KafkaTemplate.
For tests reason I wrote next class:
public class KafkaTestingTools {
static private Map<String, Consumer<Long, GenericData.Record>> consumers = new HashMap<>();
static public void sendMessage (String topic, String key, Object message, Schema schema) throws InterruptedException{
Properties properties = new Properties();
properties.put("schema.registry.url", "http://localhost:8081");
properties.put("bootstrap.servers", "http://localhost:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("linger.ms", 1);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "com.logistics.mock.CustomKafkaAvroSerializer");
KafkaProducer<String, Object> producer = new KafkaProducer<>(properties);
CustomKafkaAvroDeserializer.setTopicScheme(topic, schema);
ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, message);
producer.send(record);
producer.close();
}
static public void registerConsumerContainer(EmbeddedKafkaBroker embeddedKafka, String topic, Schema schema) throws InterruptedException{
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup" UUID.randomUUID().toString(), "true", embeddedKafka);
consumerProps.put("schema.registry.url", "http://localhost:8081");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "com.logistics.mock.CustomKafkaAvroDeserializer");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
ConsumerFactory<Long, GenericData.Record> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Long, GenericData.Record> consumer = cf.createConsumer();
consumers.put(topic, consumer);
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic);
CustomKafkaAvroDeserializer.setTopicScheme(topic, schema);
}
static public Object getSingleRecordFromRegisteredContainer(EmbeddedKafkaBroker embeddedKafka, String topic){
return SpecificData.get().deepCopy(
CustomKafkaAvroDeserializer.getTopicScheme(topic),
KafkaTestUtils.getSingleRecord(consumers.get(topic), topic).value()
);
}
}
Producer example:
@Service
@CommonsLog
public class PointProducer {
private final KafkaTemplate<String, ExportMessage> kafkaTemplate;
private final String topic;
@Autowired
public PointProducer(@Value("${kafka.producer.points}") String topic,
KafkaTemplate<String, ExportMessage> kafkaTemplate) {
this.topic = topic;
this.kafkaTemplate = kafkaTemplate;
}
public void produce(Point point) {
var message = new ExportMessage();
message.setId(point.getId());
log.warn("produce point: " message.toString());
kafkaTemplate.send(topic, point.getId().toString(), message);
kafkaTemplate.flush();
}
kafka config
spring:
kafka:
bootstrap-servers: ${spring.embedded.kafka.brokers}
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
point-deserializer: com.logistics.mock.CustomKafkaAvroDeserializer
auto-offset-reset: latest
group-id: credit_file_test
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.logistics.mock.CustomKafkaAvroSerializer
schema-registry-url: http://localhost:8081
kafka.consumer.points: points_export
kafka.producer.files: common.file
kafka.producer.orders: common.order
kafka.producer.points: common.point
And tests looks like:
@SpringBootTest
@TestMethodOrder(OrderAnnotation.class)
@EmbeddedKafka(partitions = 1, topics = { "topic1", "topic2" }, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class ApplicationLogisticOrderTest {
@Test
@Order(1)
@WithMockUser(roles = "ADMIN")
void checkSendToKafka() throws Exception {
KafkaTestingTools.registerConsumerContainer(this.embeddedKafka, TOPIC1, Message.SCHEMA$);
Thread.sleep(3000);
prepareCustomizedLogisticOrder(t -> {
});
var mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).build();
mockMvc.perform(MockMvcRequestBuilders.put("/orders/7000000/sendToKafka"));
}
And on line with perform I caught:
Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:478)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:468)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.<init>(AbstractKafkaSchemaSerDeConfig.java:177)
at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:50)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:376)
I tried to put it in application.yml, in KafkaTestingTools properties, but nothing changed, it looks like Spring looks for this property in another place.
Maybe someone met this situation and know solution?
Thanks in advance.
CodePudding user response:
The problem is here:
spring: kafka: schema-registry-url: http://localhost:8081
There is no such a property managed by Spring Boot.
More over this schema-registry-url
doesn't fit to that schema.registry.url
.
You have to consider to change it into this:
spring:
kafka:
producer:
properties:
"schema.registry.url": http://localhost:8081
See docs for more info: https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.messaging.kafka.additional-properties