/ AWS, PYTHON, SQS_SNS_SERIES

Consumindo filas SQS usando Python

Esse texto faz parte de uma série sobre a utilização da AWS SQS e SNS em uma aplicação python, sugiro a leitura dos textos anteriores para maior entendimento do processo.

Introdução

Na parte 3 dessa série iremos ver como consumir as mensagens publicadas via SNS mostradas no texto anterior. Nessa parte do projeto eu utilizei Django e Django Rest Framework para estruturar o projeto, utilizei o SDK da AWS em python que é a Boto3 e utilizei o Celery juntamento ao Redis para criar agendamento de tarefas.

Lembrando que o código desenvolvido nesse projeto poderá ser encontrado aqui.

Implementação do projeto

Consumir a mensagem via SQS

Primeiro iremos criar a função que irá realizar a chamada de execução do consumo da fila SQS para onde disparamos a mensagem através do tópico.

def poll_queue_for_messages(self, queue_url, max_messages_number=10):
        return self.sqs_client.receive_message(
            QueueUrl=queue_url,
            MaxNumberOfMessages=max_messages_number
        )

O comportamento padrão dessa função é de realizar o modo Short Poll, em outras palavras ela retorna uma amostra das mensagens contidas na fila ao invés de trazer todas as mensagens encontradas. Desse modo a quantidade de mensagens disponíveis na fila tem implicação direta na quantidade de mensagens obtidas por essa função, podendo até retornar vazio caso a quantidade da fila seja extremamente pequena, por esse motivo é importante realizar tentativas periodicamente para obter o real estado da fila.

É possível passar algumas especifacações nessa requisição, para maiores conhecimento recomendo fortemente a leitua da documentação do boto3 que é incrivelmente bem descrita.

Com a função que realiza a conexão com AWS feita, vamos para a task periódica que irá chamar a função anterior. Ela basicamente irá verificar se houve retorno e caso tenha tido iterar sobre ele e obter a mensagem para realizar o processamento daquela mensagem. Importante lembrar que devemos excluir a mensagem uma vez que ela tenha sido devidamente processada, pois caso ela persista poderá ocasionar erros como processamento duplicado e isso é algo que não queremos deixar ocorrer, imagina o problema de uma cobrança realizada duplicada? Nem pensar.

Outro problema que pode ocorrer é algum erro durante o processamento ou até mesmo a indisponibilidade do serviço que não permita que aquela mensagem seja completamente processada, para essas ocasiões não devemos permitir que a aplicação remova a mensagem da fila, um recurso que ainda não utilizamos aqui mas que é bastante útil para a recuperação das mensagens nesses casos é a Dead Letter Queue que abordaremos no próximo texto da série.

Mas voltando para o cenário que imaginamos o serviço ecommerce estará constantemente enviando mensagens para a nossa fila. Por isso o nosso serviço payment gateway precisa ficar fazendo verificações constantes na fila para que ele possa atualizar os dados da sua base conforme os dados enviados pelo serviço ecommerce enquanto processa novos pedidos. Além disso, como a função poll_queue_for_messages não retorna todas as mensagens a cada chamada, devemos fazer chamadas repetidas para capturar mensagens ainda não processadas.

Então para consumir as mensagens de forma periódica utilizei o Celery, com ele eu pude configurar para que a função de consumo da fila seja executada de minuto em minuto. Seria possível implementar esse agendamento com a AWS CloudWatch, mas como não era o foco desse projeto utilizei o Celery por facilidade de execução. Para ter o código funcionando bastou apenas realizar a instação da lib para uso no Django, e configurar o agendamento da task na base de dados usando o celery beat.

@task()
def verify_transaction_queue():
    billing_domain = BillingDomainService()
    sqs_connection = SQSConnection()

    queue_messages = sqs_connection.poll_queue_for_messages(settings.TRANSACTION_URL_QUEUE)
    if 'Messages' in queue_messages and len(queue_messages['Messages']) >= 1:
        for message in queue_messages['Messages']:
            message_body = message['Body']
            receipt_handle = message['ReceiptHandle']

            success = billing_domain.process_purchase(message_body)
            if success:
                sqs_connection.delete_message_from_queue(settings.TRANSACTION_URL_QUEUE, receipt_handle)

Conclusão e implementações futuras

No texto de hoje vimos como realizar o consumo de mensagens disponibilizadas nas filas SQS. O próximo texto será o último dessa série sobre SQS e SNS, e nele veremos o que são as Dead Letter Queues. Aguardo você lá!

mayara

Mayara Machado

Dev amante de dados, coding, forró e pizza.