Home > Mobile >  How to set dynamic partition key in spring cloud aws kinesis binder?
How to set dynamic partition key in spring cloud aws kinesis binder?

Time:01-25

I am trying to use Supplier/Consumer to produce and consume messages from Kinesis data stream. Is there a way to add partition key dynamically?

    private BlockingQueue<Message> messages = new LinkedBlockingQueue<>();

    @Bean
    public Supplier<Message<String>> produceMessages() {
        return () ->  this.messages.poll();
    }

    @Override
    public void produce(main.Test request, StreamObserver<Test> response) {
        Message input = MessageBuilder.withPayload(request.getMessage())
                .setHeader("partitionKey", "los").build();
        this.messages.offer(input);
        response.onCompleted();
    }

application.properties

spring.cloud.stream.bindings.produceMessages-out-0.producer.partitionKeyExpression=headers['partitionKey']

CodePudding user response:

We don't know what is channel1, but according to Spring Cloud Stream docs it has to be like this:

spring.cloud.stream.bindings.produceMessages-out-0.producer.partitionKeyExpression='some-key'

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_producer_properties

You can find sample here: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/kinesis-samples/kinesis-produce-consume

  • Related