Home > database >  How can I pass input data through AWS step functions with nested step functions
How can I pass input data through AWS step functions with nested step functions

Time:05-19

I have a step function with three steps:

  1. setup
  2. start & wait for finish of another step function
  3. finish

I pass some input from the first step into the second step, but I don't use all fields as inputs to the child step function, but I want to pass all of that data into step 3. I thought I could do this by specifying OutputPath, but it doesn't work and still passes through the output of the child step function to step 3.

  stateMachine: {
    StartAt: 'setup',
    States: {
      setup: {
        Type: 'Task',
        TimeoutSeconds: 3600,
        Retry: [
          {
            ErrorEquals: ['States.ALL'],
            MaxAttempts: 2,
          },
        ],
        Resource: setupFn,
        Next: 'child_sfn',
      },
      child_sfn: {
        Type: 'Task',
        TimeoutSeconds: 3600,
        Retry: [
          {
            ErrorEquals: ['States.ALL'],
            MaxAttempts: 2,
          },
        ],
        Catch: [
          {
            ErrorEquals: ['States.ALL'],
            Next: 'sfn_catch',
          },
        ],
        Parameters: {
          StateMachineArn: 'mySfnArn',
          Input: {
            'myInput1.$': '$.myInput1',
          },
        },
        OutputPath: '$.',
        Resource: 'arn:aws:states:::states:startExecution.sync',
        Next: 'finish',
      },
      finish: {
        Type: 'Task',
        TimeoutSeconds: 3600,
        Retry: [
          {
            ErrorEquals: ['States.ALL'],
            MaxAttempts: 2,
          },
        ],
        Resource: finishFn,
        End: true,
      },
      sfn_catch: {
        Type: 'Task',
        TimeoutSeconds: 3600,
        Resource: stepFunctionCatch,
        End: true,
      },
    },
  }

CodePudding user response:

In the case you only need the inputs that were provided to step 1 [setup] to be passed to step 3 [finish]

you should be able to access the Execution Input via the Context Object as described in my answer here: https://stackoverflow.com/a/71230038/4526019


If step 3 [finish] needs the input to step 1 [setup] AND the output from step 2 [child_sfn] you can use ResultPath to avoid clobbering the payload.

I'll try and show what I mean with a SFN with a single state:

⚠️ Note: Pass types are obviously different than Task types but the premise is the same

Parent SFN Input:

{
  "a": {
    "value": 1
  },
  "b": {
    "value": 2
  },
  "c": {
    "value": 3
  }
}

SFN Defintion where only the a and b keys from the input should be passed, but NOT c. The "built-up" state can be preserved by creating a new key that can be safely/predictably appended to by using ResultPath and can be consumed by some downstream task.

{
  "StartAt": "Step1",
  "States": {
    "Step1": {
      "Type": "Pass",
      "Parameters": {
        "params.$": "$['a', 'b']"
      },
      "ResultPath": "$.stateOutput.Step1",
      "End": true
    }
  }
}

Output (with preserved input and "tranformed" output created by Step1)

{
  "a": {
    "v": 1
  },
  "b": {
    "v": 2
  },
  "c": {
    "v": 3
  },
  "stateOutput": {
    "Step1": {
      "params": {
        "a": {
          "v": 1
        },
        "b": {
          "v": 2
        }
      }
    }
  }
}

Basically, using OutputPath in StepX is gonna wipe out all the input that may have been built up by all the previous steps. example from docs


The workflow that I've found most useful, although extremely tedious, is to manually write out the json before and each "phase"(?) when defining the state and making any notes along the way:

  • phase_0: output put from previous state

    {
       "a": {
         "value": 1
       },
       "b": {
         "value": 2
       },
       "c": {
         "value": 3
       }
    }
    
    • phase_1: using InputPath?

      If InputPath is defined, only send part of phase_0. This basically throws everything else away.

      {
        "//": "json object after InputPath"
      }
      
      
      • phase_2: redefine the Payload sent to the task via Parameters

        The object definition for what is actually sent to your lambda

        {
            "//": "json object sent to the task after defining Payload.Parameters"
        }
        
      • phase_3: raw output from task

        i.e. the response from lambda

        {
            "//": "json object returned from lambda"
        }
        
    • phase_4: optionally customize output from phase_3 with ResultSelector

      {
        "//": "transform the result returned by lambda (similar to Parameters.Payload)"
      }
      
  • phase_5: optionally set ResultPath?

    inserts the the object of phase_3 into the object of phase_1 at the specified path

    {
       "//": "roughly object in phase_1 updated with object in phase_4"
    }
    
  • phase_6: optionally set OutputPath

    throw away everything except OutputPath and feed it to the next state

    {
      "//": "this is what will be the input to the next state"
    }
    

Hopefully that helps and doesn't cause more confusion. I know I've spent too many hours trying to decipher the docs and map that to my own use cases

  • Related