Home > database >  Lambda writing empty records to s3
Lambda writing empty records to s3

Time:07-17

I am trying to write from firehose to s3 using the test event

{ 
  "TICKER_SYMBOL": "QXZ",
  "SECTOR": "HEALTHCARE",
   "CHANGE": -0.05,
   "PRICE": 84.51
}

My lambda code is

import json
import base64
def lambda_handler(event, context):
    print(event)
    for record in event['records']:
       #Kinesis data is base64 encoded so decode here
        payload=base64.b64decode(record["data"])
        print("Decoded payload: "   str(payload))
        json_object = {}
        output = []
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(json.dumps(json_object).encode('utf-8')).decode('utf-8')}
        output.append(output_record)
        print(output)
    return {'records': output}

The code prints the expected output and kinesis is writing to the successful s3 folder but when I download the file from s3 it is just an empty {}. What is causing the empty {} in s3?

CodePudding user response:

What is causing the empty {} in s3?

The Lambda function is serializing json_object, which is an empty dictionary {}. If you want to serialize/return the original payload with no transformations, serialize/return the original, undecoded data:

output_record = {
    'recordId': record['recordId'],
    'result': 'Ok',
    'data': record['data']
}
output.append(output_record)

Kinesis Firehose can be configured to sink to S3 without configuring Data Transformation with Lambda. If you don't require any data transformations, my suggestion is to exclude the Data Transformation configuration.

If you do require data transformations, make sure to...

  1. Perform transformations on payload
  2. Fix bug: Move output = [] outside of for loop
  3. Serialize/return transformed payload
output = []

for record in event['records']:
    payload = base64.b64decode(record["data"])
    payload_json = json.loads(payload)   

    # TODO: do transformations on payload_json
    transformed_payload = json.dumps(payload_json)
     
    output_record = {
        'recordId': record['recordId'],
        'result': 'Ok',
        'data': base64.b64encode(transformed_payload.encode('utf-8')).decode('utf-8')
    }
    output.append(output_record)

return {'records': output}
  • Related