Home > database >  Micronaut with AWS Lambda and SQS
Micronaut with AWS Lambda and SQS

Time:09-01

I have created simple application with Micronaut, and Graalvm and want to deploy it to the AWS Lambda and get it triggered from the SQS and process the messages. But somehow, it is not working as expected.

To build this application, I use command: gradlew buildNativeLambda

Below is my setup:

Micronaut application:

Trial 1: (not working)

import com.example.services.TestService;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.Post;
import jakarta.inject.Inject;

import java.util.List;
import java.util.Map;

@Controller
public class Controller {

    @Inject
    private TestService testService;

    @Post(consumes = MediaType.APPLICATION_JSON, produces = MediaType.APPLICATION_JSON)
    public Map<String, Object> indexPost(@Body List<Map<String, Object>> requestBody) throws JsonProcessingException {
        return this.testService.handleRequest(requestBody.get(0));
    }
}

Prepare a build and upload it to Lambda and add the following handler: io.micronaut.function.aws.proxy.MicronautLambdaHandler

It just works fine when I try to trigger the lambda from "Test" tab on lambda management console. However, when I add the SQS as trigger, and send the message to the SQS, the message does get disappeared from the queue (thus it is being read), but I see no logs in the lambda and the functionality is also not achieved. Thus it does not work with the SQS.

Trial 2: (not working)

import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.example.services.TestService;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.function.aws.MicronautRequestHandler;
import jakarta.inject.Inject;

import java.util.List;

@Introspected
public class SQSEventHandler extends MicronautRequestHandler<SQSEvent, Void> {

    @Inject
    private TestService testService;

    @Override
    public Void execute(SQSEvent input) {
        System.out.println("EVENT PROCESSING STARTS ===>");
        List<SQSEvent.SQSMessage> messages = input.getRecords();
        System.out.println("Number of messages:::"   messages.size());
        for (SQSEvent.SQSMessage single: messages) {
            try {
                System.out.println("Message body::: "   single.getBody());
                this.testService.handleRequest(single.getBody());
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        }
        System.out.println("<=== EVENT PROCESSING ENDS");
        return null;
    }
}

Prepare a build and upload it to Lambda and add the following handler: com.example.eventHandlers.SQSEventHandler

Now it does the same behavior with the SQS message as previous one, the message gets disappeared from the queue, but no effect in lambda.

Edit - 31-08-2022:

I use below body as an event to test from "Test" tab in lambda console, and this is to check the AWS Gateway event and it does works fine as we have @Controller class. Without @Controller it fails.

Event data:

{
  "path": "/",
  "httpMethod": "POST",
  "headers": {
    "Accept": "application/json"
  },
  "body": "<my req body>"
}

But when I try to test it with the SQS event, it fails. I have configured com.example.eventHandlers.SQSEventHandler as event handler in lambda for this test. Below is the event data which I used to test and the results:

Event data:

{
    "Records": [
        {
            "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
            "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
            "body": "{\"action\": \"READ_ALL\"}",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1545082649183",
                "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                "ApproximateFirstReceiveTimestamp": "1545082649185"
            },
            "messageAttributes": {},
            "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
            "awsRegion": "us-east-2"
        }
    ]
}

Results:

{
  "statusCode": 405,
  "multiValueHeaders": {
    "Allow": [
      "POST"
    ],
    "Content-Type": [
      "application/json"
    ]
  },
  "body": "{\"message\":\"Method Not Allowed\",\"_links\":{\"self\":{\"href\":\"https://nullnull\",\"templated\":false}},\"_embedded\":{\"errors\":[{\"message\":\"Method [GET] not allowed for URI [https://nullnull]. Allowed methods: [POST]\"}]}}",
  "isBase64Encoded": false
}

CodePudding user response:

If your AWS Lambda function is not generating entries in Amazon CloudWatch Logs, then it normally indicates that the Lambda function has insufficient permissions to use CloudWatch Logs.

This can be fixed by adding permissions to the IAM Role associated with the Lambda function. Assign it the AWSLambdaBasicExecutionRole managed policy, which grants permission to write to CloudWatch Logs.

See: AWS managed policies for Lambda features

CodePudding user response:

Finally made it working, if anyone is looking for the solution:

build.gradle:

.
.
dependencies {
    annotationProcessor 'io.micronaut:micronaut-inject-java'
    annotationProcessor("io.micronaut.data:micronaut-data-processor")
    implementation("io.micronaut:micronaut-jackson-databind")
    implementation("io.micronaut.data:micronaut-data-hibernate-jpa")
    implementation("io.micronaut.sql:micronaut-jdbc-hikari")
    implementation("jakarta.annotation:jakarta.annotation-api")
    runtimeOnly("ch.qos.logback:logback-classic")
    runtimeOnly("mysql:mysql-connector-java")
    compileOnly("org.graalvm.nativeimage:svm")
    implementation("io.micronaut.aws:micronaut-function-aws") <--- IMP!!!
    implementation("io.micronaut.aws:micronaut-function-aws-custom-runtime") <--- IMP!!!
    implementation("io.micronaut:micronaut-validation")
    testImplementation("io.micronaut:micronaut-http-client")
}
.
.

Application.java: (The starting point of the application)

public class Application extends AbstractMicronautLambdaRuntime<CustomSQSEvent, String, CustomSQSEvent, String> {
    public static void main(String[] args) throws MalformedURLException {
//        Micronaut.run(Application.class, args); // We don't need this now
        new Application().run(args);
    }

    @Override
    @Nullable
    protected RequestHandler<CustomSQSEvent, String> createRequestHandler(String... args) {
        try {
            return new SQSEventHandler(createApplicationContextBuilderWithArgs(args));
        } catch (ContainerInitializationException e) {
            throw new ConfigurationException("Exception thrown instantiating SQSEventHandler", e);
        }
    }
}

CustomSQSEvent.java: (This is needed to handle the serialization/deserialization issues with the message. This is the exact copy of class com.amazonaws.services.lambda.runtime.events.SQSEvent with few annnotations.)

@Introspected <--- Very IMP!!!
public class CustomSQSEvent implements Serializable {
    
    @JsonProperty("Records") <--- Very IMP!!!
    private List<CustomSQSEvent.SQSMessage> records;
    
    @Introspected <--- Very IMP!!!
    public static class SQSMessage implements Serializable, Cloneable {
        ...
    }

     /**
     * Default constructor
     */
     public CustomSQSEvent() {
     }

     .
     .
     .

}

SQSEventHandler.java: (The actual magic happens here)

@Introspected
public class SQSEventHandler implements RequestHandler<CustomSQSEvent, String>, ApplicationContextProvider, Closeable {


    protected final MicronautLambdaContainerHandler handler;

    public SQSEventHandler() throws ContainerInitializationException {
        this.handler = new MicronautLambdaContainerHandler();
    }

    public SQSEventHandler(ApplicationContextBuilder applicationContextBuilder) throws ContainerInitializationException {
        this.handler = new MicronautLambdaContainerHandler(applicationContextBuilder);
    }

    public SQSEventHandler(ApplicationContext applicationContext) throws ContainerInitializationException {
        this.handler = new MicronautLambdaContainerHandler(applicationContext);
    }

    @Override
    public String handleRequest(CustomSQSEvent input, Context context) {
        System.out.println("EVENT PROCESSING STARTS ===>");
        TestService testService = this.getApplicationContext().getBean(TestService.class);
        
        List<CustomSQSEvent.SQSMessage> messages = input.getRecords();

        for (CustomSQSEvent.SQSMessage single: messages) {
            try {
                System.out.println("Message body::: "   single.getBody());
                testService.handleRequest(single.getBody());
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        }
        System.out.println("<=== EVENT PROCESSING ENDS");
        return "";
    }

    @Override
    public ApplicationContext getApplicationContext() {
        return this.handler.getApplicationContext();
    }

    @Override
    public void close() {
        this.getApplicationContext().close();
    }
}

Removed HomeController.java (the class which has @Controller).

And finally build the project using following command: gradlew buildNativeLambda -Pmicronaut.runtime=lambda

Upload the created zip in the lambda and add the following as handler: com.example.eventHandlers.SQSEventHandler. This is the path to our custom event handler.

Compiled solution from below references:

  • Related