Home > database >  How to move to functional programming model to publish to Kafka in Spring cloud
How to move to functional programming model to publish to Kafka in Spring cloud

Time:07-21

I am trying to move away from now deprecated annotations like @EnableBinding and @Output but could not find a simple example to do it in a functional way. These are the files currently:

KafkaConfig.java

@Configuration
@EnableBinding({
        CcRegistrationFailureChannel.class
})
public class KafkaConfig {
}

CcRegistrationFailureChannel.java

public interface CcRegistrationFailureChannel {
    String CC_REGISTRATION = "cc-registration";

    @Output(CC_REGISTRATION)
    MessageChannel ccFailureChannel();
}

CcRegistrationFailurePublisher.java

@Log4j2
@Component
public class CcRegistrationFailurePublisher {
    public void publish(MessageChannel outputChannel, EventPayload payLoad) {
        boolean success = outputChannel.send(MessageBuilder
                .withPayload(payLoad)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build()
        );
        if (!success) {
            log.error("CC Registration Failure: publish message failed");
        }
    }
}

and message publish is invoked from following code:

@Autowired
private final CcRegistrationFailurePublisher ccRegistrationFailurePublisher;

public void sendCCRegistrationFailure(String internalUserId) {
    Long unixEpochTime = Instant.now().getEpochSecond();
    CcRegistrationFailureEventPayload ccRegistrationFailureEventPayload =
            new CcRegistrationFailureEventPayload(internalUserId, unixEpochTime, CcFailureEventType.ADD_CC_FAILURE);
    ccRegistrationFailurePublisher.publish(ccRegistrationFailureChannel.ccFailureChannel(), ccRegistrationFailureEventPayload);
}

How can I migrate from the current state to using functional way recommended by Spring?

CodePudding user response:

Use a StreamBridge https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources

bridge.send(bindingName, message);
  • Related