Home > Net >  Why messages remain in SQS after processing through lambda?
Why messages remain in SQS after processing through lambda?

Time:08-29

I have written the following lambda to move messages from "queueA" to "queueB".

async function reprocess_messages(fromQueue, toQueue) {
    try {
        const response1 = await sqs.send(new GetQueueUrlCommand({ QueueName: fromQueue }));
        const response2 = await sqs.send(new GetQueueUrlCommand({ QueueName: toQueue }));

        const fromQueueUrl = response1.QueueUrl;
        const toQueueUrl = response2.QueueUrl;

        let completed = false;
        while (!completed) {
            completed = await moveMessage(toQueueUrl, fromQueueUrl);
            // console.log(status);
        }
        // console.log(completed);
        return completed;

    } catch (err) {
        console.error(err);
    }
}


async function moveMessage(toQueueUrl, fromQueueUrl) {
    try {
        const receiveMessageParams = {
            MaxNumberOfMessages: 10,
            MessageAttributeNames: ["Messsages"],
            QueueUrl: fromQueueUrl,
            VisibilityTimeout: 2,
            WaitTimeSeconds: 0,
        };

        const receiveData = await sqs.send(new ReceiveMessageCommand(receiveMessageParams));
        // console.log(receiveData);

        if (!receiveData.Messages) {
            console.log("finished");
            return true;
        }

        const messages = [];
        receiveData.Messages.forEach(msg => {
            messages.push({ body: msg["Body"], receiptHandle: msg["ReceiptHandle"] });
        });

        const sendMsg = async ({ body, receiptHandle }) => {
            const sendMessageParams = {
                MessageBody: body,
                QueueUrl: toQueueUrl
            };
            await sqs.send(new SendMessageCommand(sendMessageParams));
            // console.log("Success, message sent. MessageID: ", sentData.MessageId);
            return "Success";
        };

        const deleteMsg = async ({ body, receiptHandle }) => {
            const deleteMessageParams = {
                QueueUrl: fromQueueUrl,
                ReceiptHandle: receiptHandle
            };
            await sqs.send(new DeleteMessageCommand(deleteMessageParams));
            // console.log("Message deleted", deleteData);
            return "Deleted";
        };

        const sent = await Promise.all(messages.map(sendMsg));
        // console.log(sent);
        await Promise.all(messages.map(deleteMsg));
        // console.log(deleted);
        console.log(sent.length);
        return false;

    } catch (err) {
        console.log(err);
    }
}


export const handler = async function (event, context) {
    console.log("Invoking lambda");
    const response = await reprocess_messages("queueA", "queueB");
    console.log(response);
}

For testing it, I populated queueA with 500 messages and invoked the lambda. After the completion I see that there were no errors in the logs but 16 messages were still remaining in queueA.

Queue Messages

Also, as per the following lambda logs there mustn't be any messages in queueA as it printed finished which is the condition to check if there aren't any message received from sqs.

lambda logs

lambda config :

  • Timeout : 2 min
  • Memory : 256 MB

lambda report : REPORT RequestId: de685eaf-9787-45f4-b800-78d3a80b4c95 Duration: 11094.61 ms Billed Duration: 11095 ms Memory Size: 256 MB Max Memory Used: 122 MB Init Duration: 470.62 ms

I am not sure why there are messages remaining.

CodePudding user response:

When a message is read from an Amazon SQS queue, the message is made 'invisible' so that it is not processed by another worker.

If the message is not deleted within the 'invisibility period', then the message will reappear on the queue. This is because Amazon SQS assumes that the worker processing the message has failed, and that the message needs to be reprocessed.

Therefore, your code should:

  • Read messages from the Queue A
  • Send the messages to Queue B
  • Delete the messages from Queue A (but only after they have been written to Queue B, to avoid potential loss of messages)

See: DeleteMessageCommand | SQS Client - AWS SDK for JavaScript v3

  • Related