Publicando uma mensagem através do SNS
Introdução
Dando continuidade na série sobre o uso da AWS SQS e SNS, hoje implementaremos a funcionalidade para publicação de mensagens utilizando SNS, e como utilizar o recurso de filtros dos serviços SNS. De modo mais introdutório, resolvi fazer duas aplicações REST utilizando Django e Django Rest Framework, que serão os serviços de ecommerce e payment_gateway que não possuem comunicação direta e realizam comunicação utilizando tópicos e filas.
No geral essas aplicações representam operações e processamentos que podem ocorrer em serviços mais completos, os serviços são para simular API’s de ecommerce. Quero ressaltar que este artigo não tem o propósito de explicar o funcionamento do Django Rest Framework, por esse motivo o código desenvolvido nesse projeto foi o mais simples possível e apenas funcional o suficiente para a utilização das funcionalidades de fila necessária.
Pois bem, vamos começar.
Implementação do projeto
Nesse projeto exemplo, imaginei um cenário onde temos um serviço responsável pelo processamento da API ecommerce, nesse serviço teremos a rota GET de listagem de produtos disponíveis e qual o vendedor daquele produto, também teremos uma rota POST para criação de um pedido de compra. Na outra ponta temos o serviço responsável pelo processamento do pagamento, o que eu chamei de payment gateway, esse serviço irá consumir a quantidade de pedidos processados pelo serviço ecommerce, e irá através de uma rota GET exibir a quantidade arrecadada de pedido para cada vendedor.
O código desenvolvido nesse projeto poderá ser encontrado aqui.
Funcionalidades do serviço
Quando o serviço recebe a solicitação de pedido de compra, a aplicação realiza os processamentos que precisa para a sua execução. Uma vez que o processo foi completado, a aplicação irá disparar uma mensagem para o tópico SNS.
def create(self, purchase_data):
purchase = Purchase()
products_pk = [product["id"] for product in purchase_data['products']]
products_list = list(self.product_domain.get_all(query_params={'pk__in': products_pk}))
purchase.total_price = self.__calculate_total_price(products_list)
# salvar o objeto
purchase = self.repository.create(purchase)
# salvar os relacionamento ManyToMany
purchase.product.add(*products_list)
self.repository.update_m2m(purchase)
# disparar mensagem para tópico SNS
self.__notificate_transaction_sns(purchase)
return purchase
A mensagem que a aplicação irá enviar vai ser um json contendo o identificador do vendedor, a data da compra, o valor total que aquela compra para aquele lojista e um identificador indicando a qual compra esses dados se referem.
def __notificate_transaction_sns(self, purchase, seller_id):
products = purchase.product.all()
message = {
"seller" : str(seller_id),
"purchase_date" : purchase.created_at.strftime("%Y-%m-%d %H:%M:%S %Z"),
"purchase_id" : str(purchase.pk),
"total_price" : str(purchase.total_price)
}
# Chama a classe de domínio que faz conexão com a AWS SNS
self.sns_connection.publish_message_to_subscribers(settings.TRANSACTION_TOPIC_SNS, json.dumps(message))
Envio da mensagem via SNS
Publicar uma mensagem em um tópico SNS é muito simples, para uma configuração mínima você precisa passar apenas o código ARN do tópico e a mensagem em questão. A mensagem pode estar em qualquer padrão definido para o projeto, nesse exemplo estou utilizando o envio de um json como mostrado anteriormente.
Vale lembrar que o código ARN do tópico foi obtido durante a criação do mesmo, o código de criação do tópico e da instanciação do cliente SNS está disponível na Parte 1 dessa série.
def publish_message_to_subscribers(self, topic_arn, message, message_attributes={}):
publish = self.sns_client.publish(
TopicArn=topic_arn,
Message=message,
MessageAttributes = message_attributes
)
return publish
Esse código é tudo o que você precisa para poder enviar uma mensagem para todos os incritos do tópico, lembrando também que é possível ter contas de email e números de telefone como inscritos de um tópico, desse modo todos eles receberiam igualmente a mensagem disparada sem haver a necessidade de adicionar um parâmetro a mais no envio. Como visto na parte 1 dessa série, apenas adicionei uma fila SQS como um inscrito desse tópico, logo ela foi quem recebeu a mensagem que foi disparada por esse trecho de código e agora nós iremos consumi-la.
Filtrando o envio de mensagens
Mas lembram que eu falei anteriormente que um tópico pode ter n inscritos e que eles podem ser do tipo SQS, email ou SMS? Imagine que no nosso tópico ao invés de uma única Queue, a gente tivesse duas Queue inscrita e aptas para receberem mensagens, e vamos imaginar também que na verdade gostaríamos nesse cenário de mandar as mensagens para uma Queue em específica. Isso é possível através da utilização dos filtros SNS, mas como funciona?
Toda vez que é disparado o envio de mensagens através da SNS, por padrão o serviço já verifica se existe uma política de filtro existente para algum inscrito, caso tenha ela verifica se o filtro descrito nos atributos da mensagem bate com algum dos filtros encontrados naquele tópico. Nos casos onde não há filtro, a SNS dispara a mensagem para todos os seus inscritos sem ter nenhuma diferenciação. A configuração do filtro é atribuído no momento da inscrição, desse modo, voltando ao código publicado no texto anterior iremos pegar a váriavel payment_subscription e configurar essa inscrição.
sns_cliente.set_subscription_attributes(
SubscriptionArn = payment_subscription['SubscriptionArn'],
AttributeName ='FilterPolicy',
AttributeValue ='{"event_type": ["order_placed"]}'
)
No código acima, a gente chama novamente o cliente sns instanciado utilizando o boto3 e chamamos a função para atualizar os atributos da inscrição. Adicionaremos um novo atributo para essa inscrição em específico, e a gente garante isso através do código Arn da inscrição da fila PaymentQueue, que é a fila que vai ser consumida pelo serviço PaymentGateway do meu exemplo. O atributo em questão é uma política de filtro, onde adicionamento um dicionário contendo a chave sinalizando que é um tipo de evento e o seu valor é um array contendo o nome dos filtros aplicados.
Vale ressaltar que esse array pode conter ter 1 ou mais especificações de filtro, e uma mensagem será enviada para esta inscrição caso o atributo da mensagem seja correspondente com ao menos 1 filtro atribuído. A comparação realizada pela AWS é booleana, ou seja, ou é exatamente igual ou não, dessa forma os filtros são case sensitve.
Para enviar uma mensagem com um filtro em específico, nós iremos reescrever a função de notificate_transaction_sns para atribuirmos a especificação do filtro nos atributos da mensagem.
def __notificate_transaction_sns(self, purchase, seller_id):
products = purchase.product.all()
message = {
"seller" : str(seller_id),
"purchase_date" : purchase.created_at.strftime("%Y-%m-%d %H:%M:%S %Z"),
"purchase_id" : str(purchase.pk),
"total_price" : str(purchase.total_price)
}
message_attributes = {
'event_type': {
'DataType': 'String',
'StringValue': 'order_placed'
}
}
self.sns_connection.publish_message_to_subscribers(settings.TRANSACTION_TOPIC_SNS, json.dumps(message), message_attributes)
E voilá, a mensagem será enviada apenas para o inscrição que possua o filtro order_placed atribuída.
Conclusão e implementações futuras
Na parte 2 dessa série nós vemos como publicar mensagens através do serviço SNS, vimos como pode ser absurdamente prático e vimos também como utilizar os filtros ao nosso favor. Nos próximos textos da série Implementando AWS SQS e SNS iremos ver como consumir as mensagens que enviamos hoje, e vamos descobrir também sobre o conceito das Dead Letter Queue. Vejo você lá!