Event-driven architectures present challenges at every turn. One such challenge is ensuring the consumption of messages in order. To address this challenge, AWS offers SQS FIFO queues:
FIFO (First-In-First-Out) queues have all the capabilities of the standard queues but are designed to enhance messaging between applications when the order of operations and events is critical, or where duplicates can't be tolerated.
There are two additional attributes required to guarantee message ordering and exactly-once processing from the producer's perspective:
MessageGroupID
: Messages within a single group are always processed one at a time in the order in which they were received by the queue.MessageDeduplicationID
: Any messages sent with the same MessageDuplicationID are accepted successfully, but only one is delivered within the 5-minute deduplication period. We can enable the content-based deduplication at the queue level to make this attribute optional.
From the consumer's perspective, there is nothing needs to be changed. Three rules govern the order in which messages leave the queue:
Return the oldest message where no other message with the same
MessageGroupId
is in flight.Return as many messages with the same
MessageGroupId
as possible.If a message batch is still not full, go back to the first rule. As a result, a single batch can contain messages from multiple
MessageGroupIds
.
With these simple rules, the queue ensures messages from the same message group are not served to more than one consumer simultaneously. However, not everything is perfect throughput goes down from nearly unlimited to 3,000 messages per second with batching (300 messages per second without batching) against the API. If we require higher throughput, we can enable the high throughput mode, which will support up to 30,000 messages per second with batching (3,000 messages per second without batching). Let's create a producer and consumer application to demonstrate how simple it is to use an AWS SQS FIFO queue. First, we need to complete the following prerequisites:
An IAM User with programmatic access.
Install AWS CLI.
Install AWS SAM CLI.
To create the queue, we will be using AWS SAM (it can also be done manually through the AWS Console). Create a template.yaml
file with the following content:
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
SQS
Resources:
SQSQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: "mySQSQueue.fifo"
FifoQueue: true
ContentBasedDeduplication: true
ReceiveMessageWaitTimeSeconds: 5
Outputs:
SQSQueueUrl:
Description: SQS Url
Value: !Ref SQSQueue
Run sam build
and sam deploy
to provision the resource. Execute the following commands to create the projects and solution:
dotnet new console -n Consumer -o ./src/Consumer
dotnet new console -n Producer -o ./src/Producer
dotnet new sln -n SQSSandbox
dotnet sln add --in-root src/Consumer
dotnet sln add --in-root src/Producer
dotnet add src/Producer package AWSSDK.SQS
dotnet add src/Producer package AWSSDK.Extensions.NETCore.Setup
dotnet add src/Producer package Microsoft.Extensions.Configuration.Json
dotnet add src/Producer package Microsoft.Extensions.DependencyInjection
dotnet add src/Consumer package AWSSDK.SQS
dotnet add src/Consumer package AWSSDK.Extensions.NETCore.Setup
dotnet add src/Consumer package Microsoft.Extensions.Configuration.Json
dotnet add src/Consumer package Microsoft.Extensions.DependencyInjection
Open the Producer
project and update the Program.cs
file as follows:
using Amazon.SQS;
using Amazon.SQS.Model;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
var configurationBuilder = new ConfigurationBuilder()
.AddJsonFile("appsettings.json");
var options = configurationBuilder.Build().GetAWSOptions();
var services = new ServiceCollection()
.AddDefaultAWSOptions(options)
.AddAWSService<IAmazonSQS>();
var provider = services.BuildServiceProvider();
var url = "<MY_QUEUE_URL>";
var sqsClient = provider.GetService<IAmazonSQS>()!;
for (int i = 0; i < 100; i++)
{
var messageGroupId = Random.Shared.Next(0, 5).ToString();
var body = $"@@{messageGroupId}@@{Guid.NewGuid()}";
var request = new SendMessageRequest(url, body)
{
MessageGroupId = messageGroupId
};
await sqsClient.SendMessageAsync(request);
Console.WriteLine($"{body} sent");
}
Add an appsettings.json
with the following content:
{
"AWS": {
"Profile": "default",
"Region": "<MY_REGION>"
}
}
Open the Consumer
project and update the Program.cs
file as follows:
using Amazon.SQS;
using Amazon.SQS.Model;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
var configurationBuilder = new ConfigurationBuilder()
.AddJsonFile("appsettings.json");
var options = configurationBuilder.Build().GetAWSOptions();
var services = new ServiceCollection()
.AddDefaultAWSOptions(options)
.AddAWSService<IAmazonSQS>();
var provider = services.BuildServiceProvider();
var url = "<MY_QUEUE_URL>";
var sqsClient = provider.GetService<IAmazonSQS>()!;
while (true)
{
var receiveRequest = new ReceiveMessageRequest
{
QueueUrl = url,
MaxNumberOfMessages = 10,
WaitTimeSeconds = 5
};
var result = await sqsClient.ReceiveMessageAsync(receiveRequest);
if (result.Messages.Any())
{
var batchId = Guid.NewGuid().ToString();
var total = result.Messages.Count;
var current = 1;
var batch = new List<DeleteMessageBatchRequestEntry>();
foreach (var message in result.Messages)
{
Console.WriteLine($"Batch({batchId}) {current} of {total} with {message.Body} received");
current++;
batch.Add(new DeleteMessageBatchRequestEntry() { ReceiptHandle = message.ReceiptHandle, Id=message.MessageId });
await Task.Delay(Random.Shared.Next(1000, 2500));
}
await sqsClient.DeleteMessageBatchAsync(url, batch);
}
else
{
Console.WriteLine("No messages available");
await Task.Delay(TimeSpan.FromSeconds(5));
}
}
Add an appsettings.json
as we did with the consumer. Run dotnet run --project src/Consumer
in two or more console windows, and dotnet run --project src/Producer
to see everything in action. You can see the code and scripts here. Thanks, and happy coding.