Python – How to get all messages in Amazon SQS queue using boto library in Python

amazon-sqsbotopython

I'm working on an application whose workflow is managed by passing messages in SQS, using boto.

My SQS queue is growing gradually, and I have no way to check how many elements it is supposed to contain.

Now I have a daemon that periodically polls the queue, and checks if i have a fixed-size set of elements. For example, consider the following "queue":

q = ["msg1_comp1", "msg2_comp1", "msg1_comp2", "msg3_comp1", "msg2_comp2"]

Now I want to check if I have "msg1_comp1", "msg2_comp1" and "msg3_comp1" in the queue together at some point in time, but I don't know the size of the queue.

After looking through the API, it seems you can either get only 1 element, or a fixed number of elements in the queue, but not all:

>>> rs = q.get_messages()
>>> len(rs)
1
>>> rs = q.get_messages(10)
>>> len(rs)
10

A suggestion proposed in the answers would be to get for example 10 messages in a loop until I get nothing back, but messages in SQS have a visibility timeout, meaning that if I poll elements from the queue, they won't be really removed, they will only be invisible for a short period of time.

Is there a simple way to get all messages in the queue, without knowing how many there are?

Best Answer

I've been working with AWS SQS queues to provide instant notifications, so I need to be processing all of the messages in real time. The following code will help you to efficiently dequeue (all) messages and handle any errors when removing.

Note: to remove messages off the queue you need to delete them. I'm using the updated boto3 AWS python SDK, json library, and the following default values:

import boto3
import json

region_name = 'us-east-1'
queue_name = 'example-queue-12345'
max_queue_messages = 10
message_bodies = []
aws_access_key_id = '<YOUR AWS ACCESS KEY ID>'
aws_secret_access_key = '<YOUR AWS SECRET ACCESS KEY>'
sqs = boto3.resource('sqs', region_name=region_name,
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key)
queue = sqs.get_queue_by_name(QueueName=queue_name)
while True:
    messages_to_delete = []
    for message in queue.receive_messages(
            MaxNumberOfMessages=max_queue_messages):
        # process message body
        body = json.loads(message.body)
        message_bodies.append(body)
        # add message to delete
        messages_to_delete.append({
            'Id': message.message_id,
            'ReceiptHandle': message.receipt_handle
        })

    # if you don't receive any notifications the
    # messages_to_delete list will be empty
    if len(messages_to_delete) == 0:
        break
    # delete messages to remove them from SQS queue
    # handle any errors
    else:
        delete_response = queue.delete_messages(
                Entries=messages_to_delete)