# How to Ensure Message Order with AWS SNS and AWS SQS

In our previous post, [Consuming AWS SQS messages in order](https://blog.raulnq.com/consuming-aws-sqs-messages-in-order), we discussed the basics of consuming messages in order using AWS SQS FIFO queues. Today, we will expand on that by incorporating the [AWS SNS FIFO topic](https://docs.aws.amazon.com/sns/latest/dg/sns-fifo-topics.html) as a message source, demonstrating how straightforward it is to preserve message order.

> An Amazon SNS FIFO topic always delivers messages to subscribed Amazon SQS queues in the exact order in which the messages are published to the topic, and only once. With an Amazon SQS FIFO queue subscribed, the consumer of the queue receives the messages in the exact order in which the messages are delivered to the queue, and no duplicates.

AWS SNS FIFO topics manage ordering and deduplication similarly to AWS SQS FIFO queues. For more details, check out our previous post.

## **Pre-requisites**

* An [**IAM User**](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_users_create.html#id_users_create_console) with programmatic access.
    
* Install [**AWS CLI**](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-quickstart.html#cli-configure-quickstart-creds).
    
* Install [**AWS SAM CLI**](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/install-sam-cli.html).
    

## AWS SAM template

Create a `template.yml` file with the following content:

```yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  SAM

Resources:

  SNSFifoTopic:
    Type: AWS::SNS::Topic
    Properties:
      TopicName: MyFifoTopic.fifo
      FifoTopic: true
      ContentBasedDeduplication: false

  SQSFifoQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: MyFifoQueue.fifo
      FifoQueue: true
      ContentBasedDeduplication: false
      DeduplicationScope: messageGroup
      FifoThroughputLimit: perMessageGroupId

  SNSFifoSubscription:
    Type: AWS::SNS::Subscription
    Properties:
      TopicArn: !Ref SNSFifoTopic
      Protocol: sqs
      Endpoint: !GetAtt SQSFifoQueue.Arn
      RawMessageDelivery: true

  SQSPolicy:
    Type: AWS::SQS::QueuePolicy
    Properties:
      Queues:
        - !Ref SQSFifoQueue
      PolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal: "*"
            Action: "sqs:SendMessage"
            Resource: !GetAtt SQSFifoQueue.Arn
            Condition:
              ArnEquals:
                aws:SourceArn: !Ref SNSFifoTopic

Outputs:
  SNSFifoTopicArn:
    Description: ARN of the SNS FIFO Topic
    Value: !Ref SNSFifoTopic

  SQSFifoQueueArn:
    Description: URL of the SQS FIFO Queue
    Value: !Ref SQSFifoQueue
```

The script uses the following resources:

* [AWS::SNS::Topic](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sns-topic.html): This resource is used to create the AWS SNS topic. The `FifoTopic` property is set to `true` to create a FIFO topic. The `ContentBasedDeduplication` property is set to `false`, which means the `MessageDeduplicationId` must be provided when publishing a message.
    
* [AWS::SQS::Queue](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sqs-queue.html): This resource is responsible for creating the AWS SQS queue. The `FifoQueue` and `ContentBasedDeduplication` properties are similar to what we explained in the lines above. The `DeduplicationScope` and `FifoThroughputLimit` properties are set accordingly to enable the [high throughput](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/high-throughput-fifo.html) feature.
    
* [AWS::SNS::Subscription:](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sns-subscription.html) This resource creates the subscription of the AWS SQS queue to the AWS SNS topic. The `RawMessageDelivery` property allows for raw message delivery.
    
* [AWS::SQS::QueuePolicy](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sqs-queuepolicy.html): This resource applies a policy to one or more AWS SQS queues. In this case, we allow the AWS SNS topic to send messages to the AWS SQS queue.
    

Run the following commands to deploy the resources to AWS:

```powershell
sam build
sam deploy --guided
```

## The Producer App

Execute the following commands:

```powershell
dotnet new console -n Producer -o ./src/Producer
dotnet new sln -n FIFOSandbox
dotnet sln add --in-root src/Producer
dotnet add src/Producer package AWSSDK.SimpleNotificationService
dotnet add src/Producer package AWSSDK.Extensions.NETCore.Setup
dotnet add src/Producer package Microsoft.Extensions.Configuration.Json
dotnet add src/Producer package Microsoft.Extensions.DependencyInjection
```

Open the `Producer` project and update the `Program.cs` file as follows:

```csharp
using Amazon.SimpleNotificationService;
using Amazon.SimpleNotificationService.Model;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using System.Text.Json;

var configurationBuilder = new ConfigurationBuilder()
    .AddJsonFile("appsettings.json");

var options = configurationBuilder.Build().GetAWSOptions();
var services = new ServiceCollection()
    .AddDefaultAWSOptions(options)
    .AddAWSService<IAmazonSimpleNotificationService>();

var provider = services.BuildServiceProvider();
var topicArn = "<MY_TOPIC_ARN>";
var snsClient = provider.GetService<IAmazonSimpleNotificationService>()!;

for (int i = 0; i < 500; i++)
{
    var messageGroupId = (ConsoleColor)Random.Shared.Next(1, 15);
    var payload = new Payload() { Color = messageGroupId, Index = i };
    var message = JsonSerializer.Serialize(payload);

    var request = new PublishRequest
    {
        TopicArn = topicArn,
        Message = message,
        MessageGroupId = messageGroupId.ToString(),
        MessageDeduplicationId = Guid.NewGuid().ToString()
    };
    var response = await snsClient.PublishAsync(request);
    Console.ForegroundColor = messageGroupId;
    Console.WriteLine($"{response.MessageId} sent");
}

class Payload
{
    public ConsoleColor Color { get; set; }
    public int Index { get; set; }
}
```

The producer will send 500 messages using 15 different message group IDs, one for each value of the `ConsoleColor` enum. The payload message will include the enum value and the message index. Add an `appsettings.json` with the following content:

```json
{
    "AWS": {
        "Profile": "default",
        "Region": "<MY_REGION>"
    }
}
```

## The Consumer App

Execute the following commands:

```powershell
dotnet new console -n Consumer-o ./src/Consumer
dotnet sln add --in-root src/Consumer
dotnet add src/Consumer package AWSSDK.SQS
dotnet add src/Consumer package AWSSDK.Extensions.NETCore.Setup
dotnet add src/Consumer package Microsoft.Extensions.Configuration.Json
dotnet add src/Consumer package Microsoft.Extensions.DependencyInjection
```

Open the `Consumer` project and update the `Program.cs` file as follows:

```csharp
using Amazon.SQS;
using Amazon.SQS.Model;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using System.Text.Json;

var configurationBuilder = new ConfigurationBuilder()
    .AddJsonFile("appsettings.json");

var options = configurationBuilder.Build().GetAWSOptions();
var services = new ServiceCollection()
    .AddDefaultAWSOptions(options)
    .AddAWSService<IAmazonSQS>();

var provider = services.BuildServiceProvider();
var url = "<MY_QUEUE_URL>";
var sqsClient = provider.GetService<IAmazonSQS>()!;
while (true)
{
    var receiveRequest = new ReceiveMessageRequest
    {
        QueueUrl = url,
        MaxNumberOfMessages = 10,
        WaitTimeSeconds = 5
    };

    var result = await sqsClient.ReceiveMessageAsync(receiveRequest);
    if (result.Messages.Any())
    {
        var total = result.Messages.Count;
        var current = 1;
        var batch = new List<DeleteMessageBatchRequestEntry>();
        foreach (var message in result.Messages)
        {
            var payload = JsonSerializer.Deserialize<Payload>(message.Body)!;
            Console.ForegroundColor = payload.Color;
            Console.WriteLine($"{payload.Index}:message {current} of {total} received");
            current++;
            batch.Add(new DeleteMessageBatchRequestEntry() { ReceiptHandle = message.ReceiptHandle, Id = message.MessageId });
            await Task.Delay(Random.Shared.Next(500, 1000));
        }
        await sqsClient.DeleteMessageBatchAsync(url, batch);
    }
    else
    {
        Console.ForegroundColor = ConsoleColor.White;
        Console.WriteLine("No messages available");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
}

class Payload
{
    public ConsoleColor Color { get; set; }
    public int Index { get; set; }
}
```

The consumer will read messages in batches of 10 and wait 5 seconds before returning an empty response if no messages are available in the queue. We will deserialize the message body and change the console's text color based on the content. Add an `appsettings.json` file as we did with the producer.

Run `dotnet run --project src/Consumer` in two or more console windows, and `dotnet run --project src/Producer` to see everything in action. You can see the code and scripts [here](https://github.com/raulnq/aws-sqs-sns-fifo). Thanks, and happy coding.
