Home > Software engineering >  import automatically s3 bucket data in DynamoDB
import automatically s3 bucket data in DynamoDB

Time:09-17

How to import s3 bucket JSON data in DynamoDB automatically using NODEJS, DynamoDB, and AWS lambda.

CodePudding user response:

import type { AWS } from '@serverless/typescript';

const serverlessConfiguration: AWS = {
  service: 'raj',
  frameworkVersion: '2',
  custom: {
    webpack: {
      webpackConfig: './webpack.config.js',
      includeModules: true,
    },
  },
  plugins: ['serverless-webpack'],
  provider: {
    name: 'aws',
    runtime: 'nodejs14.x',
    profile : 'server',
    apiGateway: {
      minimumCompressionSize: 1024,
      shouldStartNameWithService: true,
    },
    environment: {
      AWS_NODEJS_CONNECTION_REUSE_ENABLED: '1',
    },
    lambdaHashingVersion: '20201221',
  },
  // import the function via paths
  functions: {
    messageAdd : {
      handler : "src/now.handler",
      events: [
        {
          http: {
            path : 'addData',
            method : 'POST',
            cors : true,
          }
        }
      ]
    }
   },
};

module.exports = serverlessConfiguration;

CodePudding user response:

const AWS = require('aws-sdk') ;
const docClient = new AWS.DynamoDB.DocumentClient();

// The Lambda handler
exports.handler = async (event) => {
  AWS.config.update({
    region: 'us-east-1', // use appropriate region
    accessKeyId: '', // use your access key
    secretAccessKey: '' // user your secret key
});
  const s3 = new AWS.S3();
  const ddbTable = "s3todyb";
  console.log (JSON.stringify(event, null, 2));
  console.log('Using DDB table: ', ddbTable);

  await Promise.all(
    event.Records.map(async (record) => {
      try {
        console.log('Incoming record: ', record);

        // Get original text from object in incoming event
        const originalText = await s3.getObject({
          Bucket: event.Records[0].s3.bucket.name,
          Key: event.Records[0].s3.object.key
        }).promise();

        // Upload JSON to DynamoDB
        const jsonData = JSON.parse(originalText.Body.toString('utf-8'));
        await ddbLoader(jsonData);

      } catch (err) {
        console.error(err);
      }
    })
  );
};

// Load JSON data to DynamoDB table
const ddbLoader = async (data) => {
  // Separate into batches for upload
  let batches = [];
  const BATCH_SIZE = 25;

  while (data.length > 0) {
    batches.push(data.splice(0, BATCH_SIZE));
  }

  console.log(`Total batches: ${batches.length}`);

  let batchCount = 0;

  // Save each batch
  await Promise.all(
    batches.map(async (item_data) => {

      // Set up the params object for the DDB call
      const params = {
        RequestItems: {}
      };
      params.RequestItems[ddbTable] = [];

      item_data.forEach(item => {
        for (let key of Object.keys(item)) {
          // An AttributeValue may not contain an empty string
          if (item[key] === '') 
            delete item[key];
        }

        // Build params
        params.RequestItems[ddbTable].push({
          PutRequest: {
            Item: {
              ...item
            }
          }
        });
      });

      // Push to DynamoDB in batches
      try {
        batchCount  ;
        console.log('Trying batch: ', batchCount);
        const result = await docClient.batchWrite(params).promise();
        console.log('Success: ', result);
      } catch (err) {
        console.error('Error: ', err);
      }
    })
  );

};

  • Related