I am trying to publish just one message to Kafka, using Spring Cloud without any deprecated classes/methods or annotations. I would also like to be able to easily change the payload.
So for all clarity, I am trying to not use the deprecated @Output
annotation, nor any KafkaTemplate
.
My configuration:
spring:
cloud:
stream:
bindings:
message-out-0:
destination: ${spring.application.name}
producer:
key:
serializer:
type: string
format: utf-8
charset: utf-8
value:
serializer:
type: string
format: utf-8
charset: utf-8
My code - what I have tried so far:
@Component
@RequiredArgsConstructor
public class ApplicationAnnouncer implements CommandLineRunner {
private final MessageService messageService;
@Override
public void run(String... args) throws Exception {
messageService.value = "Application started...";
messageService.message();
}
}
One attempt:
@Configuration
public class MessageService {
public Object value;
@Bean
public Supplier<Message<?>> message () {
return () -> MessageBuilder.withPayload(value).build();
}
}
Another attempt:
@Configuration
public class MessageService {
public Object value;
@Bean
public Supplier<Flux<?>> message () {
return () -> Flux.fromStream(Stream.generate(() -> {
try {
Thread.sleep(1000);
return value;
} catch (Exception e) {
// ignore
}
return null;
})).subscribeOn(Schedulers.elastic()).share();
}
}
Output in console consumer for both attempts:
Hello World!
Hello World!
Hello World!
Hello World! // ... Repeated every second
The documentation states:
The framework provides a default polling mechanism (answering the question of "Who?") that will trigger the invocation of the supplier and by default it will do so every second (answering the question of "How often?").
But what if I don't want it to poll every second?
It is weird how I'm supplying the MessageService with the message... Is it configuration? Or is it a service?
I have yet to find the most basic example of just pushing ONE CUSTOMIZABLE MESSAGE to Kafka.
CodePudding user response:
You can tap into the cloud-stream-bindings by using a StreamBridge:
@Component
@RequiredArgsConstructor
public class ApplicationAnnouncer implements CommandLineRunner {
private final StreamBridge streamBridge;
@Override
public void run(String... args) throws Exception {
streamBridge.send("message-out-0", "Application started...");
}
}
The first string is the binding-name provided in the application settings derived from the bean providing the function.
You don't even need an actual bean from which the binding-name is derived. in that case, any name will do.
You can find some samples here.