Implementando Mensageria Publish/Subscribe com Python

Implementando Mensageria Publish/Subscribe com Python

Esse texto faz parte de uma série sobre SQS e SNS utilizando python.

Introdução

Eu decidi me desafiar e aprender algo novo. Nesse projeto eu quis estudar e explorar uma tecnologia que eu ainda não utilizava, então decidi me desafiar a implementar um pequeno projeto utilizando os conceitos aprendidos. Para narrar minha jornada resolvi escrever uma série de textos compartilhando o que eu tenho aprendido, ao longo dessa série irei implementar o padrão Publish/Subscribe utilizando dois serviços Rest que são desacoplados. O link para o repositório onde o projeto está em desenvolvimento está aqui.

As tecnologias escolhidas foram a Amazon Simple Queue Service (AWS SQS) e a Amazon Simple Notification Service (AWS SNS), a serem utilizadas em APIs construídas com Django Rest Framework e utilizando a lib Boto3 que é uma Amazon Web Services SDK para python.

Com a tecnologia decidida e uma ideia do que eu iria desenvolver, agora é hora de começar o projeto.

Sobre AWS SQS e SNS

Mas não poderia começar sem mergulhar antes em alguns conhecimento teóricos sobre a SQS e a SNS, a amazon contém muito material de documentação disponível o que facilitou bastante para um entendimento inicial. Mas para quem não está tão familiarizado com essas tecnologias da AWS calma que eu vou dar uma pincelada no que se trata esses serviços.

O que seriam a Amazon SNS e a Amazon SQS? De forma geral a SNS é o serviço da Amazon para publicação de notificações/mensagens para grandes números de endpoints. Ele é muito útil para aplicações de microserviços e/ou sistemas distribuídos, pois ajudam a garantir o desacoplamento entre os serviços ao mesmo tempo em que permitem que se estabeleça uma comunicação entre eles.

Os canais para envio de mensagens criados com a SNS são chamados de Tópicos e cada tópico pode ter n inscrições, uma inscrição representa o endpoint que receberá as mensagens que são enviadas naquele tópico. Um endpoint no SNS pode ser um usuário final através de um email ou sms, assim como também pode ser outro serviço da AWS como o Lambda ou AWS SQS.

A AWS SQS vem para completar esse sistema de mensageria, como o próprio nome já indica a SQS é um serviço de filas que também ajuda a garantir o desacoplamento. Esse serviço permite que seja enviado, armazenado e recebido mensagens de forma assíncrona entre aplicações, sem que haja perda de mensagens ou que seja necessário ter o sistema disponível.

O serviço SQS implementa filas normais que suportam quantidade quase que ilimitada de transações por segundo, mas em contrapartida não tem garantia de que serão entregues por ordem de chegada, e as vezes é possível que as mensagens sejam entregues mais de uma vez. A outra implementação possível são as filas FIFO que seguem o princípio First in First out, são filas onde as mensagens seguem estritamente a ordem nos seus envios e recebimentos, assim como também há a garantia de que as mensagens serão entregues uma única vez e estarão disponíveis aguardando o seu processamento e exclusão. Porém as filas FIFO tem uma taxa limitada de mensagens por segundo, suportando em média 300 operação de mensagem (envio, recebimento ou exclusão) por segundo.

No fim a decisão de que tipo de fila você irá implementar irá depender inteiramente de qual a necessidade do seu projeto, as vezes seu projeto precisa que muitas mensagens sejam disparadas por segundo como em aplicações de cartão de crédito. Mas as vezes você precisa que seja garantido uma determinada ordem, como em um aplicativo de compras que para finalizar o pedido você precisa antes ter cadastrado o seu endereço. É importante fazer o levantamento dos seus requisitos para poder definir qual tipo de fila é mais adequada pro seu projeto, nesse projeto exemplo irei usar as filas comum.

Padrão Publish/Subscribe

Com esses dois serviços é possível estabelecer o padrão de Publish-Subscribe, essa estrutura permite que se estabeleça uma comunicação totalmente assíncrona entre dois serviços independentes, por possibilitar esse nível de desacoplamento é que essa estrutura é bastante utilizada em arquiteturas de microserviço ou serveless.

De modo geral, existe um serviço B que aguarda o recebimento de notificação que serão enviadas por um serviço A, mas que esses dois serviços não tem nenhuma ligação direta. Em diversas situações o serviço B, por ser completamente independente, não necessariamente precisaria saber de quem é que se origina a notificação que ele recebeu, deveria saber apenas que ele recebeu uma mensagem e deverá seguir o seu processamento a partir dela.

Nesse caso, o serviço A apenas dispararia a mensagem para um tópico que ele tem acesso, esse tópico dispararia essa mensagem para todos os endpoints que estão inscritos nele, que no nosso exemplo seria uma fila SQS, ao receber a mensagem na fila o serviço B iria verificar quais as mensagens foram disponibilizadas na fila a qual tem acesso.

No diagrama de exemplo abaixo mostra como aconteceria esse fluxo, podemos ver inclusive que é possível enviar uma mensagem para inscritos em específicos de um tópico, desse modo é possível com o serviço SNS disparar mensagens tanto para todos os inscritos de um determinado tópico como também é possível filtrar o envio dessa mensagem. Futuramente farei uma implementação do uso desses filtros e compartilharei aqui com vocês, para esse início iremos começar simples com apenas 1 tópico SNS enviando mensagens para 1 fila SQS.

diagram_sns_sqs

Configuração do Tópico e da Fila

Você pode utilizar o console da AWS tanto para criação quanto para configuração e acesso dos tópicos e das filas, mas a Amazon também disponibiliza SDK’s para criação e gerenciamente desses serviços via código. Nesse projeto eu irei utilizar o Boto3, que é um SDK da AWS implementado para linguagem python, tanto para criação e configuração dos tópicos e de filas, como para processamento e envio de mensagens nos código mostrados nos futuros textos dessa série.

Precisamos importar o boto3 e instanciar os clientes

Lembrando que aqui você deverá substituir e utilizar as suas credenciais da AWS.

import boto3

sns_client = boto3.client(
            'sns', 
            aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
            aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
            region_name='us-east-1')

sqs_client = boto3.client(
            'sqs', 
            aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
            aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
            region_name='us-east-1')

Criando um tópico

Para criar um tópico basta dar-lhe um nome, é possível realizar configurações mais específicas utilizando o boto3, mas por agora manteremos simples.

topic_arn = sns_client.create_topic(
    Name='PurchaseEvents'
)['TopicArn']

Criando uma fila

Criar filas é igualmente simples, porém existem algumas configurações que ao decorrer do projeto veremos que será necessário fazer.

queue = sqs_client.create_queue(
    QueueName='PaymentQueue'
)
queue_url, queue_arn = queue['QueueUrl'], queue['QueueArn']

A primeira dela é criar a permissão de que o tópico que a gente criou logo acima possa enviar mensagens para esta fila, para isso devemos declarar a seguinte política de acesso.

queue_sns_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
            "Effect": "Allow",
            "Principal": "*",
            "Action": "SQS:SendMessage",
            "Resource": queue_arn,
            "Condition": {
                "StringEquals": {
                "aws:SourceArn": topic_arn
                }
            }
            }
        ]
        }

sqs_client.set_queue_attributes(
    QueueUrl= queue_url
    Attributes={
         'Policy' : json.dumps(queue_sns_policy)
    }
)

Inscrevendo a fila no nosso tópico

Agora que temos o nosso tópico e a nossa fila SQS que já está configurada para receber mensagens do tópico, falta apenas inscrever a fila para que quando uma mensagem for enviada para o nosso tópico ele dispare para nossa fila.

payment_subscription = sns_cliente.subscribe(
    TopicArn = topic_arn,
    Protocol = 'sqs',
    Endpoint = queue_arn
)

Código completo

import boto3

# Instanciando o cliente SNS e SQS utilizando as credenciais da AWS
# substitua pelas suas credenciais.
sns_client = boto3.client(
            'sns', 
            aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
            aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
            region_name='us-east-1')

sqs_client = boto3.client(
            'sqs', 
            aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
            aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
            region_name='us-east-1')

# Criação de um tópico, salvando o Arn do tópico criado para uso
# futuro.
topic_arn = sns_client.create_topic(
    Name='PurchaseEvents'
)

# Criação de uma fila, salvando o Arn da fila criada para uso futuro.
queue = sqs_client.create_queue(
    QueueName='PaymentQueue'
)
queue_url, queue_arn = queue['QueueUrl'], queue['QueueArn']

# Importante lembrar que a fila precisa ter uma política de acesso que
# permita o recebimento das mensagens advindas do seu tópico, por isso 
# iremos atualizar a fila para aceitar essa política.
queue_sns_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
            "Effect": "Allow",
            "Principal": "*",
            "Action": "SQS:SendMessage",
            "Resource": queue_arn,
            "Condition": {
                "StringEquals": {
                "aws:SourceArn": topic_arn
                }
            }
            }
        ]
        }

# Atualizar a política de acesso da fila.
sqs_client.set_queue_attributes(
    QueueUrl= queue_url
    Attributes={
         'Policy' : json.dumps(queue_sns_policy)
    }
)

# Inscrevendo a fila no tópico recém criado.
payment_subscription = sns_cliente.subscribe(
    TopicArn = topic_arn,
    Protocol = 'sqs',
    Endpoint = queue_arn
)

Conclusão e implementações futuras

E assim finalizamos a parte 1 desse projeto para explorar as funcionalidades da AWS SQS e SNS. No próximo texto irei mostrar um pouco da implementação da API em DRF e como realizar a integração com o tópico e fila que a criamos agora.

Pythonista e Engenheira de dados, também sou mãe de planta nas horas vagas.