Home > OS >  Spring 6: Spring Cloud Stream Kafka - Replacement for @EnableBinding
Spring 6: Spring Cloud Stream Kafka - Replacement for @EnableBinding

Time:01-20

I was reading "Spring Microservices In Action (2021)" because I wanted to brush up on Microservices.

Now with Spring Boot 3 a few things changed. In the book, an easy example of how to push messages to a topic and how to consume messages to a topic were presented.

The Problem is: The examples presented do just not work with Spring Boot 3. Sending Messages from a Spring Boot 2 Project works. The underlying project can be found here:

https://github.com/ihuaylupo/manning-smia/tree/master/chapter10

Example 1 (organization-service):

Consider this Config:

spring.cloud.stream.bindings.output.destination=orgChangeTopic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.kafka.binder.zkNodes=kafka #kafka is used as a network alias in docker-compose
spring.cloud.stream.kafka.binder.brokers=kafka

And this Component(Class) which can is injected in a service in this project

@Component
public class SimpleSourceBean {
    private Source source;

    private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class);

    @Autowired
    public SimpleSourceBean(Source source){
        this.source = source;
    }

    public void publishOrganizationChange(String action, String organizationId){
       logger.debug("Sending Kafka message {} for Organization Id: {}", action, organizationId);
        OrganizationChangeModel change =  new OrganizationChangeModel(
                OrganizationChangeModel.class.getTypeName(),
                action,
                organizationId,
                UserContext.getCorrelationId());

        source.output().send(MessageBuilder.withPayload(change).build());
    }
}

This code fires a message to the topic (destination) orgChangeTopic. The way I understand it, the firsttime a message is fired, the topic is created.

Question 1: How do I do this Spring Boot 3? Config-Wise and "Code-Wise"?


Example 2:

Consider this config:

spring.cloud.stream.bindings.input.destination=orgChangeTopic
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=licensingGroup
spring.cloud.stream.kafka.binder.zkNodes=kafka
spring.cloud.stream.kafka.binder.brokers=kafka

And this code:

@SpringBootApplication
@RefreshScope
@EnableDiscoveryClient
@EnableFeignClients
@EnableEurekaClient
@EnableBinding(Sink.class)
public class LicenseServiceApplication {


    public static void main(String[] args) {
        SpringApplication.run(LicenseServiceApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void loggerSink(OrganizationChangeModel orgChange) {
        log.info("Received an {} event for organization id {}",
                orgChange.getAction(), orgChange.getOrganizationId());
    }

What this method is supposed to do is to fire whenever a message is fired in orgChangeTopic, we want the method loggerSink to fire.

How do I do this in Spring Boot 3?

CodePudding user response:

In Spring Cloud Stream 4.0.0 (the version used if you are using Boot 3), a few things are removed - such as the EnableBinding, StreamListener, etc. We deprecated them before in 3.x and finally removed them in the 4.0.0 version. The annotation-based programming model is removed in favor of the functional programming style enabled through the Spring Cloud Function project. You essentially express your business logic as java.util.function.Funciton|Consumer|Supplier etc. for a processor, sink, and source, respectively. For ad-hoc source situations, as in your first example, Spring Cloud Stream provides a StreamBridge API for custom sends.

Your example #1 can be re-written like this:

@Component
public class SimpleSourceBean {
    
    @Autowired
    StreamBridge streamBridge

    public void publishOrganizationChange(String action, String organizationId){
       logger.debug("Sending Kafka message {} for Organization Id: {}", action, organizationId);
        OrganizationChangeModel change =  new OrganizationChangeModel(
                OrganizationChangeModel.class.getTypeName(),
                action,
                organizationId,
                UserContext.getCorrelationId());

        streamBridge.send("output-out-0", MessageBuilder.withPayload(change).build());
    }
}

Config

spring.cloud.stream.bindings.output-out-0.destination=orgChangeTopic
spring.cloud.stream.kafka.binder.brokers=kafka

Just so you know, you no longer need that zkNode property. Neither the content type since the framework auto-converts that for you.

StreamBridge send takes a binding name and the payload. The binding name can be anything - but for consistency reasons, we used output-out-0 here. Please read the reference docs for more context around the reasoning for this binding name.

If you have a simple source that runs on a timer, you can express this simply as a supplier as below (instead of using a StreamBrdige).

@Bean
public Supplier<OrganizationChangeModel> ouput() {
  return () -> {
    // return the payload
  };
}

spring.cloud.function.definition=output
spring.cloud.bindings.output-out-0.destination=...

Example #2

@Bean
public Consumer<OrganizationChangeModel> loggerSink() {
 return model -> {        
   log.info("Received an {} event for organization id {}",
                orgChange.getAction(), orgChange.getOrganizationId());
   };
}

Config:

spring.cloud.function.definition=loggerSink
spring.cloud.stream.bindings.loggerSink-in-0.destination=orgChangeTopic
spring.cloud.stream.bindings.loggerSinnk-in-0.group=licensingGroup
spring.cloud.stream.kafka.binder.brokers=kafka

If you want the input/output binding names to be specifically input or output rather than with in-0, out-0 etc., there are ways to make that happen. Details for this are in the reference docs.

  • Related