Home > Software engineering >  AWS Step Functions State Machine Result Selctor from SQS
AWS Step Functions State Machine Result Selctor from SQS

Time:12-29

I am building my first simple AWS Step Function: Step 1: Read a message from SQS Step 2: Run a Redshift query

I have managed to setup my initial workflow & permissions to the resources. I am trying to format my

Here is my Result Selector code for the first step:

"ResultSelector": {
        "startTime.$": "$.Body.startTime",
        "endTime.$": "$.Body.endTime"
      }

Here is the error output:

{
  "error": "States.Runtime",
  "cause": "An error occurred while executing the state 'ReceiveMessage from SQS' (entered at the event id #2). The JSONPath '$.Body.startTime' specified for the field 'startTime.$' could not be found in the input '{\"Messages\":[{\"Body\":\"{\\\"startTime\\\": 12828373, \\\"endTime\\\": 12828374}\",\"Md5OfBody\":\"1ca004b811be50a7579f3a7e6affaeb1\",\"MessageId\":\"5b6b72d6-3fb7-451a-9053-26b72de32768\",\"ReceiptHandle\":\"AQEBjgttgZBNSuGZb0LFjo16Xc2 5uc48k9QPh9af4vTGy1xzb/BZZ6mBFx0FQ4ALWUoLfppWHIJbb7Zax2 Jv2dqekvLoCWWrdjasyrJnGfduXwsY20cPuW86kXz4RJfP4qbnyvWcV5Cb63u26XUE3S 3AqREo2BNwi01mI3ceYWxguXzgSgDIMg/07Lt5kBcNvB6qCIexQDMDgH91ZmmINLuX0j5gd3spHCmzBvFoQKv4PbLBS18PsC6vL1YLGxplZ eVyzD3eoK/5AU0jqmI0l0vjn4qZCVf2iOupkwEmMe4V2AaEdxI1FJ9dSU8zPkqFhbB3na56eIXgIGwsTWs5WBvlHikTuyYjWNFn6r27qCVfwADGkENoXZG1  vRoRse9X3p5fNqqVNBL1NTBL3k3/hoxB7/A922PT0MTn7rm1I=\"}]}'"
}

The problem is obvious: I am still not creating a proper result selector.

I have json formatted my input in Python:

message = {
    'startTime': 12828373,
    'endTime': 12828374
}

# Send message to SQS queue
response = sqs.send_message(
    QueueUrl=queue_url,
    DelaySeconds=10,
    MessageBody=json.dumps(message)
)

However, it seems that the step function is grabbing multiple messages. I am also not extracting the data correctly.

Please help with how the result selector should look to extract the startTime & endTime. Is there anything else I am missing or should be doing different?

My entire step function is below:

{
  "Comment": "Run Redshift Queries",
  "StartAt": "ReceiveMessage from SQS",
  "States": {
    "ReceiveMessage from SQS": {
      "Type": "Task",
      "Parameters": {
        "QueueUrl": "******"
      },
      "Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
      "Next": "Run Analysis Queries",
      "ResultSelector": {
        "startTime.$": "$.Body.startTime",
        "endTime.$": "$.Body.endTime"
      }
    },
    "Run Analysis Queries": {
      "Type": "Task",
      "End": true,
      "Parameters": {
        "ClusterIdentifier": "******",
        "Database": "prod",
        "Sql": "select * from ******"
      },
      "Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement"
    }
  },
  "TimeoutSeconds": 3600
}

CodePudding user response:

From the error message, the output of your first state is:

{
  "Messages": [
    {
      "Body": "{\"startTime\": 12828373, \"endTime\": 12828374}",
      "Md5OfBody": "1ca004b811be50a7579f3a7e6affaeb1",
      "MessageId": "5b6b72d6-3fb7-451a-9053-26b72de32768",
      "ReceiptHandle": "AQEBjgtt...."
    }
  ]
}

So the target is under $.Messages[0].Body, which is a JSON string. You could use the built-in intrinsic functions to deal with it:

{
  "Comment": "Run Redshift Queries",
  "StartAt": "ReceiveMessage from SQS",
  "States": {
    "ReceiveMessage from SQS": {
      ...
      "ResultSelector": {
        "body.$": "States.StringToJson($.Messages[0].Body)"
      },
      ...
    }
  },
  ...
}

The output of the first state would be:

{
  "body": {
    "startTime": 12828373,
    "endTime": 12828374
  }
}

If you develop apps with AWS SFN very often, you can subscribe their What's New Feed. They will announce new features or improvements here, which might help.

References:

  • Related