Home > front end >  Spring Cloud Stream Kafka routing expression
Spring Cloud Stream Kafka routing expression

Time:11-18

I have a spring cloud stream application that uses a functional approach. The application uses a routing expression to filter messages to the consumer function which inspects the payload.

spring:
  cloud:
    function:
      routing-expression: "payload['type'] == 'Event' ? 'handleEvent' : 'commitIgnoredEvent'"
      definition: functionRouter;handleDlqMessage

This configuration gives the exception :

org.springframework.messaging.MessageHandlingException: error occurred
in message handler [org.springframework.cloud.stream.
function.FunctionConfiguration$FunctionToDestinationBinder$1@3aff91a]; nested exception is 
org.springframework.expression.spel.SpelEvaluationException:
EL1027E: Indexing into type 'org.apache.avro.generic.GenericData$Record'
is not supported, failedMessage=GenericMessage 

However, this config which uses the header instead will not throw an exception:

spring:
  cloud:
    function:
      routing-expression: "header['type'] == 'Event' ? 'handleEvent' : 'commitIgnoredEvent'"
      definition: functionRouter;handleDlqMessage

The producing system does not use headers so I cannot use the second option. Does the routing-expression work only with headers? Is there a missing configuration that will enable payload inspection ?

CodePudding user response:

It looks like you are trying to access it as a map. Try payload.type (assuming there is a getType() method).

  • Related