Scaling Amazon Elastic Kubernetes Service Workloads with KEDA and SQS

Scaling Amazon Elastic Kubernetes Service Workloads with KEDA and SQS

In this article, we'll explore another practical use case of KEDA, where we scale an application dynamically in response to message traffic. This involves leveraging the combined power of Amazon EKS Cluster, KEDA, and the AWS SQS.

Pre-requisites

  • Install Docker Desktop.

  • Enable Kubernetes (the standalone version included in Docker Desktop).

  • An Amazon EKS Cluster with KEDA installed is required. We will use the option operator as the identityOwner for our AWS SQS scaler. Therefore, we must grant the KEDA operator the necessary IAM permissions to access SQS. You can find an example of how to accomplish this here.

  • Docker Desktop Kubernetes context configured to work with the Amazon EKS cluster.

  • An IAM User with programmatic access.

  • Install the AWS CLI.

  • Install Terraform CLI.

The Application

Run the following commands to set up the solution:

dotnet new web -o MyApi
dotnet new sln -n MyApi
dotnet sln add --in-root MyApi
dotnet add MyApi package AWSSDK.SQS
dotnet add MyApi package AWSSDK.Extensions.NETCore.Setup
dotnet add MyApi package AWSSDK.SecurityToken

We will create a background service to read messages from a queue. Create a SqsBackgroundService.cs file with the following content:

using Amazon.SQS.Model;
using Amazon.SQS;
namespace MyApi;

public class SqsBackgroundService : BackgroundService
{
    private readonly string _queueUrl;
    private readonly IAmazonSQS _sqsClient;

    public SqsBackgroundService(IConfiguration configuration, IAmazonSQS sqsClient)
    {
        _sqsClient = sqsClient;
        _queueUrl = configuration.GetValue<string>("QueueUrl");
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        Console.WriteLine($"Starting polling queue at {_queueUrl}");
        while (!stoppingToken.IsCancellationRequested)
        {
            var request = new ReceiveMessageRequest
            {
                QueueUrl = _queueUrl,
                MaxNumberOfMessages = 10,
                WaitTimeSeconds = 5
            };
            var response = await _sqsClient.ReceiveMessageAsync(request);
            if (response.Messages.Any())
            {
                foreach (var msg in response.Messages)
                {
                    await Task.Delay(Random.Shared.Next(1000, 2500));
                    Console.WriteLine($"Message received with body {msg.Body}");
                    await _sqsClient.DeleteMessageAsync(new DeleteMessageRequest
                    {
                        QueueUrl = _queueUrl,
                        ReceiptHandle = msg.ReceiptHandle
                    });
                }
            }
            else
            {
                Console.WriteLine("No message available");
                await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
            }
        }
    }
}

We're introducing a random delay to simulate processing time. Next, replace the contents of the Program.cs file with:

using Amazon.SQS;
using Amazon.SQS.Model;
using Microsoft.AspNetCore.Mvc;
using MyApi;

var builder = WebApplication.CreateBuilder(args);

builder.Configuration
    .AddJsonFile("appsettings.json")
    .AddEnvironmentVariables();

builder.Services.AddDefaultAWSOptions(builder.Configuration.GetAWSOptions());
builder.Services.AddHostedService<SqsBackgroundService>();
builder.Services.AddAWSService<IAmazonSQS>();

var queue = builder.Configuration.GetValue<string>("QueueUrl");

var app = builder.Build();

app.MapGet("/", () => "Hello World!");

app.MapPost("/send", async ([FromServices]IAmazonSQS sqsClient, [FromBody] Request request) =>
{
    for (int i = 0; i < request.Messages; i++)
    {
        var body = $"{Guid.NewGuid()}";
        var message = new SendMessageRequest(queue, body);
        await sqsClient.SendMessageAsync(message);
        Console.WriteLine($"Message sent with body {body}");
    }
});

app.Run();

public record Request(int Messages);

Here, we configure all the code to register the IAmazonSQS class and set up an endpoint for sending messages to a queue.

Terraform

We will use Terraform to create the necessary resources for running our application on the Kubernetes cluster. At the project level, create a main.tf file with the following content:

terraform {
  required_providers {
    aws = {
      source = "hashicorp/aws"
      version = "5.31.0"
    }
  }
  backend "local" {}
}

provider "aws" {
  region      = "<MY_REGION>"
  profile     = "<MY_AWS_PROFILE>"
  max_retries = 2
}

locals {
  repository_name         = "myrepo"
  cluster_name            = "<MY_K8S_CLUSTER_NAME>"
  role_name               = "myrole"
  namespace               = "<MY_K8S_NAMESPASE>"
}

resource "aws_ecr_repository" "repository" {
  name                 = local.repository_name
  image_tag_mutability = "MUTABLE"

  image_scanning_configuration {
    scan_on_push = false
  }
}

data "aws_iam_policy" "sqs_policy" {
  name = "AmazonSQSFullAccess"
}

data "aws_eks_cluster" "cluster" {
  name = local.cluster_name
}


module "iam_assumable_role_with_oidc" {
  source                       = "terraform-aws-modules/iam/aws//modules/iam-assumable-role-with-oidc"
  version                      = "4.14.0"
  oidc_subjects_with_wildcards = ["system:serviceaccount:${local.namespace}:*"]
  create_role                  = true
  role_name                    = local.role_name
  provider_url                 = data.aws_eks_cluster.cluster.identity[0].oidc[0].issuer
  role_policy_arns = [
    data.aws_iam_policy.sqs_policy.arn,
  ]
  number_of_role_policy_arns = 1
}

output "role_arn" {
  value = module.iam_assumable_role_with_oidc.iam_role_arn
}

output "repository_url" {
  value = aws_ecr_repository.repository.repository_url
}

We are creating an Amazon ECR repository to upload our application's image and an IAM Role for our Pod with sufficient permissions for AWS SQS. Run the following commands to create the resources in AWS:

terraform init
terraform plan -out app.tfplan
terraform apply 'app.tfplan'

Docker Image

Create a Dockerfile with the following content:

FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
WORKDIR /app

FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
COPY ["MyApi/MyApi.csproj", "MyApi/"]
RUN dotnet restore "MyApi/MyApi.csproj"
COPY . .
WORKDIR "/MyApi"
RUN dotnet build "MyApi.csproj" -c Release -o /app/build

FROM build AS publish
RUN dotnet publish "MyApi.csproj" -c Release -o /app/publish

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "MyApi.dll"]

Run the following command to upload the image to the Amazon ECR repository:

aws ecr get-login-password --region <MY_REGION> --profile <MY_AWS_PROFILE> | docker login --username AWS --password-stdin <MY_ACCOUNT_ID>.dkr.ecr.<MY_REGION>.amazonaws.com
docker build -t <MY_ACCOUNT_ID>.dkr.ecr.<MY_REGION>.amazonaws.com/myrepo:1.0 -f .\MyApi\Dockerfile .
docker push <MY_ACCOUNT_ID>.dkr.ecr.<MY_REGION>.amazonaws.com/myrepo:1.0

Kubernetes

To interact with the AWS SQS API, we must assume an AWS IAM Role from our Pod through a Service Account. You can find more information here. Create a serviceaccount.yaml file with the following content:

apiVersion: v1
kind: ServiceAccount
metadata:
 name: kead-sa
 annotations:
   eks.amazonaws.com/role-arn: arn:aws:iam::<MY_ACCOUNT_ID>:role/myrole

Next, we will use the Service Account mentioned earlier. Create a deployment.yaml file with the following content:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: keda-deployment
  labels:
    app: api
spec:
  replicas: 1
  selector:
    matchLabels:
      app: api
  template:
    metadata:
      labels:
        app: api
    spec:
      serviceAccountName: kead-sa
      containers:
        - name: container
          env:
            - name: ASPNETCORE_ENVIRONMENT
              value: Development
            - name: QueueUrl
              value: <MY_QUEUE_URL>
          image: <MY_ACCOUNT_ID>.dkr.ecr.<MY_REGION>.amazonaws.com/myrepo:1.0
          ports:
            - name: http
              containerPort: 8080
              protocol: TCP

Our application will be accessible through a Service using a Load Balancer as its type. Create a service.yaml file containing the following content:

apiVersion: v1
kind: Service
metadata:
  name: keda-service
  labels:
    app: api
spec:
  type: LoadBalancer
  ports:
    - port: 80
      targetPort: 8080
      protocol: TCP
      name: http
  selector:
    app: api

Finally, create a Scaled Object referencing the Deployment created earlier. Create a scaledobject.yaml file containing the following content:

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: keda-so
spec:
  minReplicaCount: 1
  maxReplicaCount: 15  
  scaleTargetRef:
    name: keda-deployment
  triggers:
  - type: aws-sqs-queue
    metadata:
      queueURL: <MY_QUEUE_URL>
      queueLength: "5"
      awsRegion: <MY_REGION>
      identityOwner: operator

Execute the following commands to deploy the application to the cluster:

kubectl apply -f serviceaccount.yaml --namespace=<MY_K8S_NAMESPASE>
kubectl apply -f deployment.yaml --namespace=<MY_K8S_NAMESPASE>
kubectl apply -f service.yaml --namespace=<MY_K8S_NAMESPASE>
kubectl apply -f scaledobject.yaml --namespace=<MY_K8S_NAMESPASE>

Run kubectl get services --namespace=<MY_K8S_NAMESPACE> to see the URL assigned to our application. The output will look something like this:

NAME           TYPE           CLUSTER-IP      EXTERNAL-IP   PORT(S)        AGE
kead-service   LoadBalancer   10.100.67.26    <MY_URL>      80:32349/TCP   27m

Tests

Send a POST request to the previous URL with the following payload:

{
    "Messages":1000
}

Next, we can observe how the Deployment scales up to create more pods in response to the load on the application. Run kubectl get pods --namespace=<MY_K8S_NAMESPACE>:

NAME                               READY   STATUS    RESTARTS   AGE
keda-deployment-65466c4d89-2dtmq   1/1     Running   0          19s
keda-deployment-65466c4d89-47tnc   1/1     Running   0          4s
keda-deployment-65466c4d89-5n6s7   1/1     Running   0          19s
keda-deployment-65466c4d89-5rdcs   1/1     Running   0          4s
keda-deployment-65466c4d89-6crln   1/1     Running   0          34s
keda-deployment-65466c4d89-9vxkx   1/1     Running   0          34s
keda-deployment-65466c4d89-d6kkp   1/1     Running   0          4s
keda-deployment-65466c4d89-dkjwc   1/1     Running   0          19s
keda-deployment-65466c4d89-g22vm   1/1     Running   0          19s
keda-deployment-65466c4d89-hgx9z   1/1     Running   0          4s
keda-deployment-65466c4d89-nkc2n   1/1     Running   0          4s
keda-deployment-65466c4d89-p2nzc   1/1     Running   0          4s
keda-deployment-65466c4d89-w4mhz   1/1     Running   0          4s
keda-deployment-65466c4d89-wz6z6   1/1     Running   0          4h31m
keda-deployment-65466c4d89-xrbdj   1/1     Running   0          34s

You can find the code and scripts here. Thank you, and happy coding.