DJANGO-EB-SQS: Vereinfacht die Kommunikation von Django-Anwendungen mit AWS SQS

Druckfreundlich, PDF & E-Mail

AWS services like Amazon ECS, Amazon S3, Amazon Kinesis, Amazon SQS and Amazon RDS are used extensively around the world. Here at Barracuda, we use AWS Simple Queue Service (SQS) to manage messaging within and among the microservices that we have developed on the Django framework.

AWS SQS is a message queuing service that can “send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be available.” SQS is designed to help organizations decouple applications and scale services, and it was the perfect tool for our work on microservices.  However, each new Django-based microservice or decoupling of an existing service using AWS SQS required that we duplicate our code and logic to communicate with AWS SQS. This resulted in lot of repeat code and encouraged our team to build this GitHub library: DJANGO-EB-SQS

Django-EB-SQS is a python library meant to help developers quickly integrate AWS SQS with existing and/or new Django based applications. The library takes care of the following tasks:

  • Serializing the data
  • Adding delaying logic
  • Continuous polling from queue
  • De-serializing the data as per AWS SQS standards and/or using third-party libraries to communicate with AWS SQS.

In short, it abstracts all the complexity involved in communicating with AWS SQS and lets developers focus only on core business logic.

The library is based on Django ORM framework and boto3 library.

How we use it

Our team works on an email protection solution that uses artificial intelligence to detect spear phishing and other social engineering attacks. We integrate with our customer's Office 365 account and receive notifications whenever they receive new emails. One of the tasks is to determine if the new email is clean from any fraud or not. On receiving such notifications, one of our services (Figure1: Service 1) talks to Office 365 via Graph API and gets those emails. For further processing of those emails and to make the emails available for other services, those emails are then pushed to AWS SQS queue (Figure1: queue_1).

Figure 1
Let’s look at a simple use case on how we use the library in our solutions. One of our services (Figure 1: Service 2) is responsible to extract headers and feature sets from individual emails and make them available for other services to consume.

Service 2 is configured to listen to queue_1 to retrieve the raw email bodies.

Let's say that Service 2 performs the following actions:

# consume email messages from queue_1

# extract headers and feature sets from emails

# submit a task

process_message.delay(tenant_id=, email_id=, headers=, tenant_id=, feature_set=, ….)

This process_message method won’t be called up synchronously, instead it will be queued up as a task and will get executed once one of the workers picks it up. The worker here could be from same service or from a different service. The caller of the method need not worry about the underlying behavior and how the task will get executed.

Let’s look at how the process_message method is defined as a task.

from eb_sqs.decorators import task

@task(queue_name='queue_2′, max_retries=3)
def process_message(tenant_id: int, email_id: str, headers: List[dict], feature_set: List[dict], …) :

try:

# perform some action using headers and feature sets
# also can queue up further tasks, if required

except(OperationalError, InterfaceError) as exc:

try:

process_message.retry()

except MaxRetriesReachedException:

logger.error(‘MaxRetries reached for Service2:process_message ex: {exc}')

When we decorate the method with the task decorator, what happens underneath is that it adds extra data like the calling method, target method, its arguments, and some additional metadata before it serializes the message and pushes it to the AWS SQS queue. When the message is consumed from the queue by one of the workers, it has all the information needed to execute the task: which method to call, which parameters to pass, and so on.

We can also retry the task in case of an exception. However, to avoid an infinite loop scenario, we can set an optional parameter max_retries where we can stop processing after we reach the maximum number of retries. We can then log the error or send the task to a dead letter queue for further analysis.

AWS SQS gives the ability to delay the processing of the message up to 15 mins. We can add similar capability to our task by passing the delay parameter:

process_message.delay(email_id=, headers=, …., delay=300) # delaying by 5 min

Executing the tasks can be achieved by running the Django command process_queue. This supports listening to one or more queues, reading from the queues indefinitely and executing the tasks as they come in:

python manage.py process_queue –queues

We just saw how this library makes communication within service or between services easy via AWS SQS queues.

More details on how to configure the library with Django settings, and ability to listen to multiple queues, development setup and many more features can be found here.

Contribute

If you wish to contribute to the project, please refer here: DJANGO-EB-SQS

Nach oben scrollen
Twittern
Teilen
Teilen