I'm trying to set up so that when I deploy a new environment the zipkin
Kafka topic used by spring cloud sleuth zipkin sender would have a low retention period since it will get processed by something else or it will be ignored (on the developer machine).
I tried to add this
@Bean("zipkin")
public NewTopic zipkin() {
return TopicBuilder.name("zipkin")
.config(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(Duration.ofMinutes(2).getSeconds() * 1000))
.compact()
.replicas(1)
.build();
}
but that didn't work because the topic is already created and the configured already. I know I can do
kafka-configs --bootstrap-server $_KAFKA_BOOTSTRAP_SERVER --alter --topic zipkin --add-config retention.ms=2000
But I'd want it set up on the Spring application.
CodePudding user response:
You'll need to use an AdminClient instance to modify existing topic configurations
CodePudding user response:
Spring-Kafka does not provide an AdminClient
in the application context so it needs to be built rather than having it autowirable for you. The following code shows how to do it by creating the AdminClient
using the bootstrap servers provided in the environment and setting multiple properties.
/**
* Retention time for zipkin messages. Defaults to 2 minutes.
*/
@Value("${zipkin.retention.ms:120000}")
private long zipkinRetentionMs;
/**
* Segment bytes size for zipkin. Defaults to 10MB.
*/
@Value("${zipkin.segment.bytes:10485760}")
private long zipkinSegmentBytes;
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@PostConstruct
public void fixZipkin() {
try (final var adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers))) {
adminClient.incrementalAlterConfigs(
Map.of(
new ConfigResource(ConfigResource.Type.TOPIC, "zipkin"),
List.of(
new AlterConfigOp(
new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(zipkinRetentionMs)),
AlterConfigOp.OpType.SET
),
new AlterConfigOp(
new ConfigEntry(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(zipkinSegmentBytes)),
AlterConfigOp.OpType.SET
)
)));
} finally {
}
}