I am running a state machine running a dynamodb query (called using CallAwsService). The format returned looks like this:
{
Items: [
{
"string" : {
"B": blob,
"BOOL": boolean,
"BS": [ blob ],
"L": [
"AttributeValue"
],
"M": {
"string" : "AttributeValue"
},
"N": "string",
"NS": [ "string" ],
"NULL": boolean,
"S": "string",
"SS": [ "string" ]
}
}
]
}
I would like to unmarshall this data efficiently and would like to avoid using a lambda call for this
The CDK code we're currently using for the query is below
interface FindItemsStepFunctionProps {
table: Table
id: string
}
export const FindItemsStepFunction = (scope: Construct, props: FindItemStepFunctionProps): StateMachine => {
const { table, id } = props
const definition = new CallAwsService(scope, 'Query', {
service: 'dynamoDb',
action: 'query',
parameters: {
TableName: table.tableName,
IndexName: 'exampleIndexName',
KeyConditionExpression: 'id = :id',
ExpressionAttributeValues: {
':id': {
'S.$': '$.path.id',
},
},
},
iamResources: ['*'],
})
return new StateMachine(scope, id, {
logs: {
destination: new LogGroup(scope, `${id}LogGroup`, {
logGroupName: `${id}LogGroup`,
removalPolicy: RemovalPolicy.DESTROY,
retention: RetentionDays.ONE_WEEK,
}),
level: LogLevel.ALL,
},
definition,
stateMachineType: StateMachineType.EXPRESS,
stateMachineName: id,
timeout: Duration.minutes(5),
})
}
CodePudding user response:
Can you unmarshall the data downstream? I'm not too well versed on StepFunctions, do you have the ability to import utilities?
Unmarshalling DDB JSON is as simple as calling the unmarshall function from DynamoDB utility:
The first step is used to provide parameters to the query. This step can be omitted and define the parameters in the query step:
"Set Query Parameters": {
"Type": "Pass",
"Next": "DynamoDB Query ...",
"Result": {
"tableName": "<TABLE_NAME>",
"key_value": "<QUERY_KEY>",
"attribute_value": "<ATTRIBUTE_VALUE>"
}
}
The next step is the actual query to DynamoDB. You can also use GetItem
instead of Query
if you have the record keys.
"Type": "Task",
"Parameters": {
"TableName": "$.tableName",
"IndexName": "<INDEX_NAME_IF_NEEDED>",
"KeyConditionExpression": "#n1 = :v1",
"FilterExpression": "#n2.#n3 = :v2",
"ExpressionAttributeNames": {
"#n1": "<KEY_NAME>",
"#n2": "<ATTRIBUTE_NAME>",
"#n3": "<NESTED_ATTRIBUTE_NAME>"
},
"ExpressionAttributeValues": {
":v1": {
"S.$": "$.key_value"
},
":v2": {
"S.$": "$.attribute_value"
}
},
"ScanIndexForward": false
},
"Resource": "arn:aws:states:::aws-sdk:dynamodb:query",
"ResultPath": "$.ddb_record",
"ResultSelector": {
"result.$": "$.Items[0]"
},
"Next": "Check for DDB Object"
}
The above example seems a bit complicated, using both ExpressionAttributeNames
and ExpressionAttributeValues
. However, it makes it possible to query on nested attributes such as item.id
.
In this example, we only take the first item response with $.Items[0]
. However, you can take all the results if you need more than one.
The next step is to check if the query returned a record or not.
"Check for DDB Object": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.ddb_record.result",
"IsNull": false,
"Comment": "Found Context Object",
"Next": "Parse DDB Object"
}
],
"Default": "Do Nothing"
}
And lastly, to answer your original question, we can parse the query result, in case that we have one:
"Parse DDB Object": {
"Type": "Pass",
"Parameters": {
"string_object.$": "$.ddb_record.result.string_object.S",
"bool_object.$": "$.ddb_record.result.bool_object.Bool",
"dict_object": {
"nested_dict_object.$": "$.ddb_record.result.item.M.name.S",
},
"dict_object_full.$": "States.StringToJson($.ddb_record.result.JSON_object.S)"
},
"ResultPath": "$.parsed_ddb_record",
"End": true
}
Please note that:
Simple strings are easily converted by
"string_object.$": "$.ddb_record.result.string_object.S"
The same for numbers or booleans by
"bool_object.$": "$.ddb_record.result.bool_object.Bool")
Nested objects are parsing the map object (
"item.name.$": "$.ddb_record.result.item.M.name.S"
, for example)Creation of a JSON object can be achieved by using
States.StringToJson
The parsed object is added as a new entry on the flow using "ResultPath": "$.parsed_ddb_record"