I am trying to use Supplier/Consumer to produce and consume messages from Kinesis data stream. Is there a way to add partition key dynamically?
private BlockingQueue<Message> messages = new LinkedBlockingQueue<>();
@Bean
public Supplier<Message<String>> produceMessages() {
return () -> this.messages.poll();
}
@Override
public void produce(main.Test request, StreamObserver<Test> response) {
Message input = MessageBuilder.withPayload(request.getMessage())
.setHeader("partitionKey", "los").build();
this.messages.offer(input);
response.onCompleted();
}
application.properties
spring.cloud.stream.bindings.produceMessages-out-0.producer.partitionKeyExpression=headers['partitionKey']
CodePudding user response:
We don't know what is channel1
, but according to Spring Cloud Stream docs it has to be like this:
spring.cloud.stream.bindings.produceMessages-out-0.producer.partitionKeyExpression='some-key'
You can find sample here: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/kinesis-samples/kinesis-produce-consume