Java – AWS SQS Java. Not all messages are retrieved from the SQS queue

amazon-sqsamazon-web-servicesjava

I have been trying several approaches to retrieve all messages from the SQS queue by using AWS SDK for Java to no avail. I have read about the distributed nature of the AWS SQS and that messages are stored on the different servers. But what I do not understand is why this architecture is not hidden from the end user. What tricks do I have to apply in Java code to retrieve all messages and be 100% sure that no one was missed?

I tried this with the "Long Polling":

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages) {
System.out.println(" Message");
System.out.println(" MessageId: " + message.getMessageId());
System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
System.out.println(" MD5OfBody: " + message.getMD5OfBody());
System.out.println(" Body: " + message.getBody());
for (Entry<String, String> entry : message.getAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue());
}
}
System.out.println();

And this with Request Batching / Client-Side Buffering:

    // Create the basic Amazon SQS async client
    AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();

    // Create the buffered client
    AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);

    CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyTestQueue");

    CreateQueueResult res = bufferedSqs.createQueue(createRequest);

    SendMessageRequest request = new SendMessageRequest();
    String body = "test message_" + System.currentTimeMillis();
    request.setMessageBody( body );
    request.setQueueUrl(res.getQueueUrl());

    SendMessageResult sendResult = bufferedSqs.sendMessage(request);

    ReceiveMessageRequest receiveRq = new ReceiveMessageRequest()
    .withMaxNumberOfMessages(10)
    .withQueueUrl(queueUrl);
    ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);

    List<Message> messages = rx.getMessages();
    for (Message message : messages) {
    System.out.println(" Message");
    System.out.println(" MessageId: " + message.getMessageId());
    System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
    System.out.println(" MD5OfBody: " + message.getMD5OfBody());
    System.out.println(" Body: " + message.getBody());
    for (Entry<String, String> entry : message.getAttributes().entrySet()) {
    System.out.println(" Attribute");
    System.out.println(" Name: " + entry.getKey());
    System.out.println(" Value: " + entry.getValue());
    }
    }

But I am still unable to retrieve all messages.

Any idea?

AWS Forum keeps silence on my post.

Best Answer

When receiving messages from an SQS queue, you need to repeatedly call sqs:ReceiveMessage.

On each call to sqs:ReceiveMessage, you will get 0 or more messages from the queue which you'll need to iterate through. For each message, you'll also need to call sqs:DeleteMessage to remove the message from the queue when you're done processing each message.

Add a loop around your "Long Polling" sample above to receive all messages.

for (;;) {
    ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
    List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
    for (Message message : messages) {
        System.out.println(" Message");
        System.out.println(" MessageId: " + message.getMessageId());
        System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
        System.out.println(" MD5OfBody: " + message.getMD5OfBody());
        System.out.println(" Body: " + message.getBody());
        for (Entry<String, String> entry : message.getAttributes().entrySet()) {
            System.out.println(" Attribute");
            System.out.println(" Name: " + entry.getKey());
            System.out.println(" Value: " + entry.getValue());
        }
    }
    System.out.println();
}

Also note that you may receive the same message more than once. So allow your work to "reprocess" the same message, or detect a repeated message.

Related Topic