I am trying to write from firehose to s3 using the test event
{
"TICKER_SYMBOL": "QXZ",
"SECTOR": "HEALTHCARE",
"CHANGE": -0.05,
"PRICE": 84.51
}
My lambda code is
import json
import base64
def lambda_handler(event, context):
print(event)
for record in event['records']:
#Kinesis data is base64 encoded so decode here
payload=base64.b64decode(record["data"])
print("Decoded payload: " str(payload))
json_object = {}
output = []
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(json_object).encode('utf-8')).decode('utf-8')}
output.append(output_record)
print(output)
return {'records': output}
The code prints the expected output and kinesis is writing to the successful s3 folder but when I download the file from s3 it is just an empty {}. What is causing the empty {} in s3?
CodePudding user response:
What is causing the empty {} in s3?
The Lambda function is serializing json_object
, which is an empty dictionary {}
. If you want to serialize/return the original payload with no transformations, serialize/return the original, undecoded data:
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': record['data']
}
output.append(output_record)
Kinesis Firehose can be configured to sink to S3 without configuring Data Transformation with Lambda. If you don't require any data transformations, my suggestion is to exclude the Data Transformation configuration.
If you do require data transformations, make sure to...
- Perform transformations on
payload
- Fix bug: Move
output = []
outside offor
loop - Serialize/return transformed
payload
output = []
for record in event['records']:
payload = base64.b64decode(record["data"])
payload_json = json.loads(payload)
# TODO: do transformations on payload_json
transformed_payload = json.dumps(payload_json)
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(transformed_payload.encode('utf-8')).decode('utf-8')
}
output.append(output_record)
return {'records': output}