I have a DynamoDB stream that indexes and pushes data to OpenSearch via a Lambda Function that uses Boto3. The item in the stream looks like this JSON object here:
{
"d8346fda0c35418580c98209df378653": {
"M": {
"cloudPlatformAoi": {
"L": [
{
"S": "Google Cloud Platform"
}
]
},
"cloudPlatformStrength": {
"L": [
{
"S": "AWS"
}
]
},
"integratedDevelopmentEnvironmentAoi": {
"L": [
{
"S": "TextMate"
}
]
},
"webFrameworkStrength": {
"L": [
{
"S": "Drupal"
}
]
},
"lastEdited": {
"S": "13-Jun-2022 (16:34:09.233933)"
},
"title": {
"S": "This is my third post (edited)"
},
"body": {
"S": "asdf"
},
"programmingLanguageStrength": {
"L": [
{
"S": "Python"
}
]
},
"programmingLanguageAoi": {
"L": [
{
"S": "Elixir"
}
]
},
"dbAoi": {
"L": [
{
"S": "PostgreSQL"
}
]
},
"databaseStrength": {
"L": [
{
"S": "DynamoDB"
}
]
},
"webFrameworkAoi": {
"L": [
{
"S": "Symfony"
}
]
},
"timeCreated": {
"S": "09-Jun-2022 (13:30:29.967379)"
},
"integratedDevelopmentEnvironment": {
"L": [
{
"S": "TextMate"
}
]
},
"level": {
"S": "one"
}
}
},
"1d5c49e0fc8c458ebc2e74835831a5c8": {
"M": {
"cloudPlatformAoi": {
"L": [
{
"S": "Google Cloud Platform"
}
]
},
"cloudPlatformStrength": {
"L": [
{
"S": "Google Cloud Platform"
}
]
},
"integratedDevelopmentEnvironmentAoi": {
"L": [
{
"S": "Vim"
}
]
},
"webFrameworkStrength": {
"L": [
{
"S": "Flask"
}
]
},
"lastEdited": {
"S": "13-Jun-2022 (17:30:32.808160)"
},
"title": {
"S": "My First Post (edited) 1"
},
"body": {
"S": "test"
},
"programmingLanguageStrength": {
"L": [
{
"S": "Python"
}
]
},
"programmingLanguageAoi": {
"L": [
{
"S": "Erlang"
}
]
},
"dbAoi": {
"L": [
{
"S": "Oracle"
}
]
},
"databaseStrength": {
"L": [
{
"S": "Couchbase"
}
]
},
"webFrameworkAoi": {
"L": [
{
"S": "Spring"
}
]
},
"timeCreated": {
"S": "13-Jun-2022 (16:28:23.582059)"
},
"integratedDevelopmentEnvironment": {
"L": [
{
"S": "Vim"
}
]
},
"awsomeBuilderStage": {
"S": "2"
}
}
},
"bd9cc68521564858871a7482d77bb1a5": {
"M": {
"cloudPlatformAoi": {
"L": [
{
"S": "Google Cloud Platform"
}
]
},
"cloudPlatformStrength": {
"L": [
{
"S": "Google Cloud Platform"
}
]
},
"integratedDevelopmentEnvironmentAoi": {
"L": [
{
"S": "Vim"
}
]
},
"webFrameworkStrength": {
"L": [
{
"S": "Flask"
}
]
},
"lastEdited": {
"S": "13-Jun-2022 (16:37:50.576490)"
},
"title": {
"S": "My First Post (edited)"
},
"body": {
"S": "test"
},
"programmingLanguageStrength": {
"L": [
{
"S": "Python"
}
]
},
"programmingLanguageAoi": {
"L": [
{
"S": "Erlang"
}
]
},
"dbAoi": {
"L": [
{
"S": "Oracle"
}
]
},
"databaseStrength": {
"L": [
{
"S": "Couchbase"
}
]
},
"webFrameworkAoi": {
"L": [
{
"S": "Spring"
}
]
},
"timeCreated": {
"S": "13-Jun-2022 (16:28:23.582059)"
},
"integratedDevelopmentEnvironment": {
"L": [
{
"S": "Vim"
}
]
},
"awsomeBuilderStage": {
"S": "3"
}
}
}
}
When I index and push the object to OpenSearch it includes the Types associated with each nested JSON Object. For example:
"cloudPlatformStrength": {
"L": [
{
"S": "AWS"
}
]
}
Instead of:
"cloudPlatformStrength": [
"Google Cloud Platform"
]
How would I go about fixing the data being pushed to OpenSearch? Would I have to just perform ETL process on each entry? Or is there a better way to do so?
CodePudding user response:
This should do what you want:
from boto3.dynamodb.types import TypeDeserializer
def handler(event, context):
deserializer = TypeDeserializer()
for record in event['Records']:
data = {key: deserializer.deserialize(value) for key, value in
record['dynamodb']['NewImage'].items()}