Vicente Rodríguez

Feb. 16, 2022

Serving Deep Learning Models using the Publish Subscribe Pattern

Complex models require good hardware to run, often a GPU is a most. This makes model serving only capable using cloud providers which offer virtual machines (instances) with GPUs attached.

These instances need to be listening to user requests all the time. Consequently the cost for these instances ascends to hundred or thousands of dollars each month.

Cloud providers like AWS, GCP, and Azure offer Spot Instances that are cheaper temporal instances where the provider can stop them at any time when the resources are required for other clients who are willing to pay more. Hence, spot instances should be used along with normal instances in order to provide a fulltime service.

The principal idea of this post is to show how a deep learning infrastructure can be built around these Spot Instances to run our neural networks. Two different kinds of instances will be used, one main server called Server A which will be working all the time, and one or more Spot Instances with GPUs attached to them called Server B. Server B instances are created when a request arrives and deleted when no new requests have arrived for a while, this is done to further economize the cost of the servers.

This infrastructure should handle:

These requirements can be achieved using a pub/sub architecture. Server A publishes messages that Server B subscribes to and the other way around.

The tools used to build this infrastructure are Django, Celery, RabbitMQ, Docker, and TensorFlow. However, some of these tools can be easily replaced, for instance, we could use Pytorch, FastAPI, Redis Pika, etc.

The model to serve is a Deep Learning model to enhance anime images. Code for Server A is here and for Server B is here.

Google Cloud is the selected platform to build both servers. Thus, the Python GCP libraries are used to create and delete instances and also to upload and download files to Google Storage.

Infrastructure Preview

Celery is used to communicate Server A and Server B. Celery has its process where functions can be executed. A function that is executed inside the Celery process is called a task. Celery can call a task in one machine and execute the task in a different machine, thus creating communication between the two machines. In the next section, we will learn more about how Celery works.

infrastructure preview

Server A is always listening to user requests through Django, in each request, Django receives the user's email and the images. Django calls several Celery tasks to:

If there is at least one Server B instance available, Celery executes the TensorFlow inference task in Server B, otherwise, Celery executes a function to create a new Server B instance.

Server B is only used when there are requests to enhance images, otherwise Server B is shut down so we are not charged for it. Both Server A and Server B run a Celery process, these processes communicate with each other thanks to RabbitMQ using messages that trigger tasks.

For each request, Server A calls a task that is executed in Server B. When a new Server B instance is ready to work, it sends a message to the Server A instance to execute a task called remote_server_ready.

Celery not only allows us to communicate Server A and Server B instances, each time a new request arrives this request is added to a Queue, the Celery process from Server B consumes requests from this Queue that can add thousands of new requests each second, furthermore, if there are a lot of requests waiting in the Queue, one or more Server B instances can be additionally created by the app and start consuming as soon as their Celery process is ready. Celery can handle all this without additional configuration on its own. This makes our infrastructure scalable to thousands of requests.

Throughout the following sections, we are going to learn all the essential details to understand how this infrastructure works.

Python, Django, threads and processes

Before getting into the Celery explanation we need to know how Python and Django handle user requests.

Python can only execute one Thread at the same time (GIL), threads can be used when we have I/O bound tasks where the CPU stays idle until the task is finished, since the CPU is waiting for some I/O response such as an HTTP request or a database query, Python is able to spawn another thread to handle additional tasks, but tasks can't run in parallel, they run concurrently, this is called Concurrency. If two tasks need to run in parallel at the same time, we have to spawn a new Process. Each Python application (Django, FastAPI, Celery, etc) spawns at least one main process to execute code.

For instance, when Django receives a request, its process creates a new thread to execute all the code that the request needs to return a response to the user, if Django receives another request at the same time, its process creates a second thread, however, this new thread can only execute code if the other thread (or threads) is waiting for some I/O bound task or until the other thread has finished its execution. In order to handle parallel requests in Django, a server like Gunicorn is used, this type of server spawns more Django processes, each one of these processes runs in one CPU/core so a multicore CPU is necessary. Also, each process can spawn multiple threads to handle concurrency.

This Django behavior occurs when a WSGI server is used (such as Gunicorn), ASGI servers behave differently.

Celery

Celery is used in this infrastructure to execute tasks in parallel and communicate Server A and Server B. When Django receives a request and this request has to execute a time-intensive task that is CPU Bound, so the cpu is used all the time, a new request has to wait until the task is finished. To overcome this problem, the task execution can be moved to the Celery process and leave Django free for new requests.

In Celery we have the following concepts:

Pika is a library similar to Celery where the concepts of Producer and Consumer are easier to understand since each has its own process, one process (Producer) sends messages to the other process (Consumer), in the case of Celery, the Celery worker acts as the Producer and Consumer in the same execution/process. Thus, the same worker that sends a message can also receive the same message.

Producer and Consumer example

Celery creates all the low level logic for us whereas in Pika we are the ones who must create the logic for the consumers and producers.

The Message is just the data that we want to send to the task, if we have a send_email task, this task needs the user's email, then the email becomes the message:

email = request.POST.get('email')

send_email.delay(email)

Using .delay we can execute the send_email task/function in the Celery worker, internally, Celery produces/sends a message with the contents of email and also consumes/receives the same message to execute the send_email task. Thus, each time we talk about "sending a message" we also talk about a task execution.

Multiple Celery workers can be running in the same or different machines, when a message is sent from any worker, all the available workers (including the one that sent the message) will try to consume the message. In other words, each worker is always listening to all the messages that are sent, even though only one worker can consume the message, we don't know which worker will be the one consuming the message and this can be a problem.

For instance, in Server A we have Django and one Celery worker running, in Server B we have TensorFlow and another Celery worker running, for each request to enhance images, a message is sent from Server A Celery worker, and we want that Server B Celery worker consumes this message, but the same Server A Celery worker might end up consuming the message causing a problem since TensorFlow is not running in the Server A Celery Worker.

In order to fix this problem Queues are used to route messages properly. When a Celery worker is executed without any configuration, the worker creates a default Queue, this can be called Celery or Default, this default Queue buffers all the messages that any Celery worker sends and also each Celery worker consumes messages from this Queue. Thus, when a new message arrives in the default Queue, all the workers will try to consume the message but only one fulfill the task.

More Queues can be created by Celery, for instance Celery can create a Queue called images, when a new request to enhance images arrives, Celery can send a message only to this Queue and Celery workers can also listen to one specific Queue. In this way we can make the Celery worker from Server B to only consume messages from the images Queue and make the Server A to only consume messages from the Default Queue, so we make sure that each task is executed where it should do.

Queues Example

Server B only consumes messages from the images Queue but can send messages to the default Queue, Server A only consumes messages from the default Queue and in the same way can send messages to the images Queue.

RabbitMQ

As we have seen, Celery is in charge of sending and consuming messages and even creating queues, however, we still need a service where all the messages and queues are stored. This service is called broker, the broker is where all the messages are stored until a worker consumes them.

Each Celery worker makes a connection to the broker, in this way multiple workers can be synchronized to operate together.

RabbitMQ is the selected broker for this infrastructure, nevertheless, other types of brokers can be used, for example, Redis. Celery always needs a broker to work.

Server A. Django and Celery Apps

Server A should have at least 2GB of ram and 2 CPU cores.

Once we know the kit of tools to build the infrastructure, let's check the code for the Server A applications. In this repository you can find the code for this section, I reccomend you to check all the code in the repository since here we will only review the most important parts.

To make Celery work inside Django the configuration is the following one:

superresolution/superresolution/celery.py

import os

from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'superresolution.settings.development')

app = Celery('superresolution')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()

app is the instance of the Celery object

superresolution/superresolution/settings/common.py

from kombu import Queue

.
.
.

CELERY_BROKER_URL = os.environ['BROKER_URL']

CELERY_RESULT_BACKEND = 'django-db'

CELERY_TIMEZONE = 'UTC'

CELERY_TASK_QUEUES = (
    Queue('default', routing_key='default'),
    Queue('images', routing_key='images.enhance'),
)

CELERY_TASK_DEFAULT_QUEUE = 'default'

To create Celery tasks we need to put all those tasks inside a file called tasks.py

superresolution/tasks/tasks.py

@shared_task
def remote_server_ready(name):
    """
    Called when remote server is done
    """
    # task has to wait until we add a new task from the request
    server_name = SRSpotServer.update_server_status(SRSpotServer.SRSpotServerStatus.ON, name)

    # Log the execution + server_name

    pending_tasks = SRTask.objects.filter(status=SRTask.SRTaskStatus.INCOMPLETE).only("id", "zip_id")

    for task in pending_tasks:
        celery_app.send_task('remote.run_sr_task', [task.id, task.zip_id], queue='images')

    pending_tasks.update(status=SRTask.SRTaskStatus.RUNNING)

    # Log the execution + tasks_added 

The decorator @shared_task is used to declare the remote_server_ready function as a Celery task, which can run in the Celery worker.

The remote_server_ready task is called from Server B and executed in Server A, SRSpotServer is a Django model that stores all the Google Cloud instances that are created, in this case we are calling the update_server_status class method from the SRSpotServer model:

superresolution/tasks/models.py

@classmethod
@transaction.atomic()
def update_server_status(cls, status, name):
    last_server = cls.get_queryset().select_for_update().filter(name=name).latest('created_at')

    # from DEPLOYING to ON
    last_server.status = status
    last_server.save(update_fields=['status'])

    return last_server.name

Race Condition

To understand the code above, the race condition concept needs to be introduced, as we know, Django can have multiple threads and processes running to handle multiple requests at the same time, when one of these requests makes a database query, this denotes an I/O bound task, hence, another thread or process can proceed and handle another request. This also implies that two requests are able to query the same data from the database, inside the update_server_status function a record/row is obtained by its name .filter(name=name) to later change its state from DEPLOYING to ON, (since Celery is receiveing the signal from Server B, which is ready to receive requests, we change the status of that specific Server B).

It could happen that two or more requests query the same SRSpotServer object at the same time, so even if one request is changing its status to ON, the other requests can still get the SRSpotServer object's status as DEPLOYING. This problem is worse in the following example:

@shared_task
def get_server_information_task(email, zip_id):
    server_available, task, server_name = SRSpotServer.get_server_information(email, zip_id)

    if task:
        task = model_to_dict(task, fields=["id", "zip_id"])

    return server_available, task, server_name

In this Celery task we ask for the server status using the class method get_server_information:

superresolution/tasks/models.py

@classmethod
@transaction.atomic()
def get_server_information(cls, email, zip_id):
    on_servers = cls.get_queryset().select_for_update().all()
    on_servers = list(on_servers.values_list('status', flat=True)) # Rows are locked here

    current_task = SRTask(email=email, zip_id=zip_id)

    if "ON" in on_servers: # at least one server is available
        current_task.status = SRTask.SRTaskStatus.RUNNING
        current_task.save()

        return True, current_task, None

    elif "DEPLOYING" in on_servers:
        current_task.save()

        return True, None, None

    current_task.save()
    # Not server available, create a new one
    last_server = cls.objects.create()

    return False, None, last_server.name

When a new request arrives, Django look through the states of all the SRSpotServer objects, searching for some Server B which status is ON to send the request, otherwise, Django creates a new Server B.

SRSpotServer objects can have 3 states, DEPLOYING, ON and OFF. Using:

on_servers = cls.get_queryset().select_for_update().all()
on_servers = list(on_servers.values_list('status', flat=True))

Django queries the database to obtain all the SRSpotServer objects' states inside a list, if this list does not contain one ON or DEPLOYING values then there are no Server B instances available so Django creates a new one:

last_server = cls.objects.create()

The problem take place when two or more requests arrive at the same time. User 1 and User 2 execute the following code with a difference of milliseconds:

on_servers = cls.get_queryset().select_for_update().all()
on_servers = list(on_servers.values_list('status', flat=True))

on_servers list only contains OFF states.

User 1 get to the next code line first:

last_server = cls.objects.create()

Thus, User 1 creates a new SRSpotServer record in the database which state is DEPLOYING.

If we run the next code again:

on_servers = cls.get_queryset().select_for_update().all()
on_servers = list(on_servers.values_list('status', flat=True))

Now the list on_servers also contains one DEPLOYING value. However, User 2 got to the code before User 1 created the new SRSpotServer record and did not found any Server B available, thus, User 2 also gets to the code line to create a new SRSpotServer record as well:

last_server = cls.objects.create()

This problem where two or more threads/processes access to the same record at the same time is called race condition. In order to over come the problem, a deadlock is used where the queried rows are locked so no other thread/process can read or modify its content. As you may noticed, both of the previous methods have the following decorators:

@transaction.atomic()

And both methods use the .select_for_update() method when the database is queried. select_for_update locks the selected rows and the @transaction.atomic() decorator keeps the lock until the whole function is executed. In this way, User 1 locks the rows, creates a new SRSpotServer record, meanwhile User 2 is waiting, once the rows are unlocked, User 2 reads the rows that now contain a new SRSpotServer record with status DEPLOYING so User 2 does not create another SRSpotServer record.

In this case all the rows in the table are locked which could not be necessary, for example if the whole infrastructure can only create up to N Server B instances, we could just get the last n rows and lock them.

The SRSpotServer model contains several more methods where the same technique to lock rows is used. For instance remote_server_down:

@classmethod
@transaction.atomic()
def remote_server_down(cls, name):
    last_server = cls.get_queryset().select_for_update().filter(name=name).latest('created_at') # row is locked here

    # from ON to OFF
    last_server.status = SRSpotServer.SRSpotServerStatus.OFF
    last_server.save(update_fields=['status'])

    remain_servers = SRSpotServer.objects.filter(status=SRSpotServer.SRSpotServerStatus.ON)

    if remain_servers.exists():
        # if more workers/servers available they can consume the pending_tasks if any
        return last_server.name, None

    pending_tasks = SRTask.objects.filter(
        Q(status=SRTask.SRTaskStatus.INCOMPLETE) | Q(status=SRTask.SRTaskStatus.RUNNING)
    )

    if pending_tasks.exists():
        # if there are pending tasks and no servers/workers available we have to create a new server
        current_server = cls.objects.create()

        return last_server.name, current_server.name

The method above is executed when Google Cloud is going to stop one of the Server B instances. When GCP stops one of our spot instances, the platform notifies us 30 seconds before so we are able to handle the disconnection, in this case the Celery process from Server B calls the remote_server_off Celery task inside the tasks.py file:

@shared_task
def remote_server_off(name):
    last_server_name, new_server_name = SRSpotServer.remote_server_down(name)

    if new_server_name:
        response = create_gcloud_vm(new_server_name)

        # Log the execution + new server created 
    response = delete_gcloud_vm(last_server_name)
    # Log the execution + server down

At the same time this task calls the remote_server_down method.

Inside this method the SRSpotServer record which is being stopped by GCP (Obtained by its name .filter(name=name)) is locked:

last_server = cls.get_queryset().select_for_update().filter(name=name).latest('created_at') 

A new SRSpotServer record is also created if pending tasks exists:

pending_tasks = SRTask.objects.filter(
    Q(status=SRTask.SRTaskStatus.INCOMPLETE) | Q(status=SRTask.SRTaskStatus.RUNNING)
)

if pending_tasks.exists():
    # if there are pending tasks and no servers/workers available we have to create a new server
    current_server = cls.objects.create()

The record is locked inside this method to avoid the same race condition problem, a new request to get_server_information could arrive when Server B is shutting down by GCP, this request look through each SRSpotServer object and don't find objects which state is ON (all the Server B instances are down) so the request creates a new SRSpotServer record. However, remote_server_down could be also creating a new SRSpotServer record since pending tasks could exist.

If the row is first locked inside remote_server_down and the query from get_server_information contains this locked row, the execution of get_server_information must wait until the remote_server_down execution finishes

SRTask is also a Django model created each time a request to enhance anime images arrives.

Our Django app has two different pages, one to upload the images to enhance and one to download the enhanced images

superresolution/tasks/views.py

def index(request):
    if request.method == 'GET': return render(request, 'index.html')

    image_files = request.FILES.getlist('images')
    email = request.POST.get('email')

    zip_id = uuid.uuid4()

    make_zip_images(image_files, zip_id)

    chain(
        group(
            upload_files_task.s(zip_id),
            get_server_information_task.s(email, zip_id)
        ) |
        send_task_to_server.s()
    ).apply_async()

    messages.success(request, 'Your images are being processed, please wait some minutes')

    return redirect("tasks:download")

index takes the user request to enhance images, since Celery can only take serialized data, such as Json objects, Strings, lists or dictionaries, and images are not serialized, we first create a temporal zip file that contains the images using the make_zip_images function, the variable zip_id is used to identify each zip file along all the app, both Server A and Server B instances. The functions chain and group come from Celery, these functions are used to create workflows for our Celery tasks.

All the tasks inside the chain function are executed in the Celery Worker, in this way Django only executes the make_zip_images function and calls the Celery tasks so the whole index request is really fast.

group executes several tasks in parallel, in this case, upload_files_task and get_server_information_task are executed at the same time (thus we need a multi-core processor), the former uploads the zip file, which contains the images, to Google Cloud Storage and the latter is the function that gets the state of the Server B instances to create a new instance or obtain one of the available instances.

chain links several tasks, the next task is called until the previous task is done, in this case the group count as one big task, until all the tasks inside this group are done the next task, send_task_to_server, is executed.

superresolution/tasks/tasks.py

@shared_task
def send_task_to_server(args):
    url, server_info = args
    server_available, task, server_name = server_info

    if server_available and task:
        celery_app.send_task('remote.run_sr_task', [task.get("id"), task.get("zip_id")], queue='images')

    elif not server_available:
        response = create_gcloud_vm(server_name)
        # Handle errors, status, logging

This task receives the results from the previous tasks, if at least one Server B instance is available, the run_sr_task tasks from Server B Celery Worker is called:

celery_app.send_task('remote.run_sr_task', [task.get("id"), task.get("zip_id")], queue='images')

send_task is used to call tasks that are not available in the current worker but in a remote worker, run_sr_task is a task that is defined only in the Celery worker from Server B, this worker is called remote, the task id and zip_id information is sent, the queue that is used to send the message is also selected.

if no Server B instances are available, the create_gcloud_vm function is executed, this function uses the Google Cloud Python API to create new instances.

Server B. Celery and TensorFlow Apps

Server B only has a Celery Worker and a TensorFlow application, Docker is used to serve both applications, code can be found in this repository.

Dockerfile

FROM tensorflow/tensorflow:2.7.0-gpu

# if none GPU available use:
# FROM tensorflow/tensorflow:2.7.0

COPY app/requirements.txt /usr/src

WORKDIR /usr/src/

RUN pip3 install -r requirements.txt

COPY app /usr/src

ENV CELERY_BROKER=pyamqp://rabbit_user:rabbit_password@ip:5672

EXPOSE 5672

CMD ["celery", "-A", "remote", "worker", "--pool=solo", "-Q", "images", "-l", "info", "--without-heartbeat"]

This Docker file is used to build the docker container, the image used is the official from TensorFlow, here we can use the image with GPU integration, or the image without GPU just to test the infrastructure when we don't have access to some GPUs in GCP.

To connect the Celery Worker to RabbitMQ we need the CELERY_BROKER env variable, the RabbitMQ configuration is covered in the Server B configuration section where rabbit_user and rabbit_password are created, the ip used must be the public ip of Server A.

To run the Celery worker the following command is used:

celery -A remote worker --pool=solo -Q images -l info --without-heartbeat"

remote is the name of the worker, -Q indicates which Queues the worker is going to listen to, in this case the images Queue, -l indicates the log level of Celery, --without-heartbeat indicates that the worker is not going to send periodic messages to send its state. The latter is a recommended configuration where RabbitMQ is the one that should handle the state of the workers and not the worker itself.

Finally we have --pool=solo, Celery workers can run in different ways:

The TensorFlow app most of the time will use the whole GPU memory to enhance the anime images, thus if we run two or more tasks in parallel some of these tasks won't be able to run and will trigger an error since no more memory is available. Thus, the solo option is used to run one task at a time.

app/remote.py

broker_url = os.environ.get('CELERY_BROKER', '')

app = Celery('superresolution', broker=broker_url)
app.conf['worker_prefetch_multiplier'] = 1
app.conf['task_acks_late'] = True
app.conf['timezone'] = 'UTC'
app.conf['broker_heartbeat'] = 0

tf_inference = None
g_storage = None

"""
Local Tasks
"""

@shared_task(queue='images')
def run_sr_task(task_id, zip_id):
    g_storage.create_local_folders(zip_id)
    g_storage.download_images(zip_id)

    tf_inference.run_inference(zip_id)

    g_storage.upload_images(zip_id)

    # Logger

    app.send_task('tasks.tasks.task_finished', [task_id], queue='default')

"""
Remote Tasks
"""

@worker_ready.connect
def notify_server_created(sender, **kargs):
    server_name = os.environ.get('SERVER_NAME', '')

    global tf_inference
    tf_inference = TFInference()

    global g_storage
    g_storage = GCloudStorage()

    # print(type(sender)) <class 'celery.worker.consumer.consumer.Consumer'>
    with sender.app.connection() as conn:
        #  print(type(conn))  <class 'kombu.connection.Connection'>
         sender.app.send_task('tasks.tasks.remote_server_ready', [server_name], connection=conn, queue='default')

The Celery worker from Server B is configured differently than the worker from Server A. We can find two principal differences:

When there is only one Server B instance running tasks and this instance is terminated by GCP, setting task_acks_late as True keeps the messages in the broker until a new Server B instance is created, this new instance once connects to the broker will begin to receive all the messages which were not acknowledged as well as the new messages from new requests.

notify_server_created task runs when the Celery worker is ready to consume messages, tf_inference is used to run the Anime Model using TensorFlow, and g_storage is used to download and upload the zip files from Google Cloud Storage, the task remote_server_ready is called here and executed in Server A, when the latter task is called, server_name is sent as a parameter so Server A knows which Server B instance is ready to consume messages.

run_sr_task is the task where the images are enhanced:

Each SRTask record belongs to one user, the record saves the user's email, the zip_id and the record id, the zip_id, and id of each record is sent as parameters to the run_sr_task task, this task calls the task_finished and sends again the id as parameter:

supperresolution/tasks/tasks.py

@shared_task
def task_finished(task_id):
    """
    Called when the remote server has processed the images from the task_id
    """
    current_task = SRTask.objects.get(id=task_id)
    current_task.status = SRTask.SRTaskStatus.DONE
    current_task.save(update_fields=['status'])

    pending_tasks = SRTask.objects.filter(status=SRTask.SRTaskStatus.RUNNING)

    # we have the last task of the queue
    if not pending_tasks.exists():
        to_time = datetime.utcnow() + timedelta(minutes=25)

        keep_or_kill_server_task.apply_async((task_id,), eta=to_time)

In the same way as SRSpotServer records, SRTask records can have 3 different states, INCOMPLETE, RUNNING and DONE. When a new SRTask record is created its state is INCOMPLETE, if this record is sent to the run_sr_task task, its state changes to RUNNING, when the task task_finished is executed the record's state changes to DONE, when the user asks for their images and the record's state is DONE, the enhanced images are downloaded and returned to the user, otherwise the user is notified that their images are not ready yet.

Having states for each user's task allows us to have control of the whole workflow, for instance if there are SRTask records which state is INCOMPLETE and these records were created some time ago, this means that something is failing, maybe the Server B instance creation triggered an error and could not be created.

Finally, in order to save resources and money, each Server B instance is terminated if no new requests have arrived in a while. Inside the task_finished task we ask if there are more requests to enhance images or if the current request was the last:

pending_tasks = SRTask.objects.filter(status=SRTask.SRTaskStatus.RUNNING)

# we have the last task of the queue
if not pending_tasks.exists():
    to_time = datetime.utcnow() + timedelta(minutes=25)

    keep_or_kill_server_task.apply_async((task_id,), eta=to_time)

In the case of the current request being the last one, the task keep_or_kill_server_task will be executed after 25 minutes.

@shared_task
def keep_or_kill_server_task(task_id):
    last_task = SRTask.objects.latest('created_at')

    if last_task.id == task_id:
        # after 25 minutes no new tasks arrived
        names = SRSpotServer.kill_server_by_application()

        for name in names:
            response = delete_gcloud_vm(name)

            # Log the execution + server deleted 

Once the task is executed, if the SRTask record with id task_id is the last created, so no new requests arrived in the 25 minutes period, all the Server B instances are terminated by the application.

This behavior can be different, it all depends on the needs of the application, for instance, a Celery beat task, which runs every n minutes or hours, can execute the same code and look over the requests if new requests did not arrive then stop all the instances. This same task can also inspect the number of requests and decide if a new Server B instance should be created to accelerate the workflow.

Celery is really flexible and allows us to make our own workflows. Even Celery has a page dedicated exclusively to the different workflows that can be used to build applications.

Configuration of Server A

Let's start with the configuration of Server A, We need Django, RabbbitMQ and Mysql.

Install RabbitMQ

In order to install RabbitMQ in a ubuntu server we need to copy the contents of the script https://www.rabbitmq.com/install-debian.html#apt-quick-start-packagecloud

into a new script like:

vim rabbit.sh

use

sudo chmod +x rabbit.sh

and

./rabbit.sh

Check if RabbitMQ is running and its version:

systemctl status rabbitmq-server

sudo rabbitmqctl version

Version 3.9.13 at the creation of this post.

For production settings RabbitMQ recommends changing the limit of free disk space, if the server has less than this free disk space, RabbitMQ stops taking new messages until more free disk space or ram is available

sudo vim /etc/rabbitmq/rabbitmq.conf

inside add:

disk_free_limit.relative = 1.0

If we have 1gb of ram, the limit will be 1gb and so on.

Restart the service to take the changes:

sudo systemctl restart rabbitmq-server

We can check the configuration using:

sudo rabbitmq-diagnostics status

To accept new connections from remote servers a new user should be created:

sudo rabbitmqctl add_user serverArabbituser myrabbitsuperpw

where serverArabbituser is the username and myrabbitsuperpw the password

The new user needs permissions:

sudo rabbitmqctl set_permissions -p / serverArabbituser ".*" ".*" ".*"

The permissions are for configure, write, and read:

You can read more about it here

Finally we restart the service again:

sudo systemctl restart rabbitmq-server

For security reasons we should also delete the default user called guest:

sudo rabbitmqctl delete_user guest

Google Cloud Platform

Some GCP knowledge is necessary due to the relative difficulty of the page platform.

GCP encloses different services by projects, A project is created when we want to access to the platform services. I called the project tensorflow-spot-arch, to access all the GCP services using the Python API we need a service account JSON file.

We need to go to IAM & Admin/Service Accounts and click Create Service Account. In the section Grant this service account access to project add Compute Instance Admin (v1), Storage Admin and Service Account User as roles. Once the service account is created we need to download a JSON key. Select the created account and click Manage Keys, in the Keys section click Add Key and select JSON. This JSON file is used in Server A and Server B instances.

If we need to add more roles to an already created service account we can go to IAM & Admin/IAM and click Edit principal for the service account that we want to modify.

Now in Cloud Storage we need to create a new bucket called tensorflow-spot-arch-bucket and inside the bucket create two folders, current and done. Under Permissions we need to add another role, in the new principals field put the name of the service account and in roles add Storage Legacy Bucket Owner.

The names of the project and buckets can be different, to use different names we need to change the variables inside the superresolution.settings/common.py file:

GS_BUCKET_NAME = "tensorflow-spot-arch-bucket"

GCP_PROJECT_NAME = "tensorflow-spot-arch"
GCP_ZONE = "us-central1-b"
GCP_REGION = "us-central1"

GCP_SERVICE_EMAIL = "tensorflow-spot-arch-service-a@tensorflow-spot-arch.iam.gserviceaccount.com"

If something fails while creating or deleting servers you should look at the Operations section in compute engine.

If Server A instance is also created using GCP, the RabbitMQ port needs to be open, we can open ports adding firewall rules inside VPC network/Firewall, the firewall type is Ingress and the port is tcp:5672.

At the end of the post there are resources to learn more about GCP.

Django configuration

We are going to use MYSQL as the database, the following steps are needed to install all the dependencies for the Django app:

You should change someuser and somepassword:

sudo apt-get install mysql-server

# We may need to use:
# sudo mysql_secure_installation

sudo mysql

CREATE USER 'someuser'@'localhost' IDENTIFIED WITH mysql_native_password BY 'somepassword';

CREATE DATABASE app_db;
GRANT ALL ON app_db.* TO 'someuser'@'localhost';

FLUSH PRIVILEGES;

Then we create a mysql configuration file that Django will use to connect to the db:

sudo vim /etc/mysql/dj.cnf
[client]
database = app_db
user = someuser
password = somepassword
default-character-set = utf8mb4

[mysql]
default-character-set = utf8mb4
[mysqld]
character-set-client-handshake = FALSE
character-set-server = utf8mb4
collation-server = utf8mb4_unicode_ci
sudo systemctl daemon-reload
sudo systemctl restart mysql

For django we need to install some dependencies:

sudo apt install -y python3-pip
sudo apt install -y build-essential libssl-dev libffi-dev python3-dev
sudo apt install libmysqlclient-dev default-libmysqlclient-dev
sudo apt install python3.8-venv

Now inside the folder application:

python3 -m venv djangoenv
. djangoenv/bin/activate
pip install -r requirements.txt

Finally we create the environment variables for the application:

vim .env
DJANGO_SETTINGS_MODULE="myblog.settings.production"
SERVER_IP=""
SECRET_KEY=""
SERVICE_GCP_FILE="route to the service account JSON file"
BROKER_URL="pyamqp://serverArabbituser:myrabbitsuperpw@ip:5672"

SERVER_IP should be the public ip of the instance.

This file will be used by gunicorn.

The ip for the broker url needs to be the internal/private ip of the instance if a provider like GCP, AWS, Azure is used.

We have to add:

export DJANGO_SETTINGS_MODULE="myblog.settings.production"
source ~/.bashrc

We could need to run . djangoenv/bin/activate again.

to the .bashrc file so all the manage.py commands run with the production environment.

run:

python manage.py migrate
python manage.py migrate django_celery_results
python manage.py createsuperuser

Celery configuration

To run the Celery worker for Server A we use:

celery -A superresolution worker -l info -Q default

This worker will only listen to the default Queue.

However, it's a better practice to run the worker as a daemon. More info can be found in this page. Systemd is a good option to daemonize the worker.

Gunicorn configuration

run:

sudo nano /etc/systemd/system/gunicorn.socket

[Unit]
Description=gunicorn socket

[Socket]
ListenStream=/run/gunicorn.sock

[Install]
WantedBy=sockets.target

and

sudo nano /etc/systemd/system/gunicorn.service

[Unit]
Description=gunicorn daemon
Requires=gunicorn.socket
After=network.target

[Service]
User=deployuser
Group=www-data
WorkingDirectory=/home/deployuser/superresolution
EnvironmentFile=/home/deployuser/superresolution/.env
ExecStart=/home/deployuser/superresolution/djangoenv/bin/gunicorn \
          --access-logfile - \
          --workers 3 \
          --bind unix:/run/gunicorn.sock \
      superresolution.wsgi:application

[Install]
WantedBy=multi-user.target

Change deployuser to the server user used to deploy the application.

sudo systemctl start gunicorn.socket
sudo systemctl enable gunicorn.socket

Nginx configuration

sudo apt install nginx

sudo nano /etc/nginx/sites-available/superresolution

server {
    listen 80;
    server_name public_ip;

    location = /favicon.ico { access_log off; log_not_found off; }

    location / {
        include proxy_params;
        proxy_pass http://unix:/run/gunicorn.sock;
    }
}
sudo ln -s /etc/nginx/sites-available/superresolution /etc/nginx/sites-enabled
sudo nginx -t
sudo systemctl restart nginx

Configuration of Server B

The first step is to install Docker from this page

Get the app files from the repository using zip or git. The Service Account JSON file should be added to app/config folder. Once inside the folder app, at the level of the Dockerfile run:

sudo docker build -t tf:remote .

Now the container can be run using:

sudo docker run -it --name tf_app tf:remote

However, we want this process to be automatic, for this reason GCP has startup scripts that run once the virtual machine is ready:

#!/bin/bash

NAME=$(curl http://metadata.google.internal/computeMetadata/v1/instance/name -H "Metadata-Flavor: Google")

sudo docker run \
    --gpus all \
    -e SERVER_NAME=${NAME} \
    --name tf_app tf:remote

Each Server B instance is created with a different name, this name is saved as metadata, we can access to this metadata using the computeMetadata API. This name is used to identify each Server B instance throughout the whole infraestructure.

Inside the file superresolution/tasks/gcloud_config.py is the configuration used to create each Server B instance, the startup script is declared as metadata in the same configuration.

In the case that GPU instances can not be used, we can remove the following part:

{
    "acceleratorCount": 1,
    "acceleratorType": f"projects/{project_name}/zones/{zone}/acceleratorTypes/ nvidia-tesla-t4"
}

and --gpus all from the startup script.

inside guestAccelerators so the instances are created without GPUs.

In the same way as startup scripts, shutdown scripts can be used to run commands when the instance is stopped:

#!/bin/bash

NAME=$(curl http://metadata.google.internal/computeMetadata/v1/instance/name -H "Metadata-Flavor: Google")

sudo docker exec -it \
    tf_app \
    celery call tasks.tasks.remote_server_off \
    --queue=default -k '{"name": ${NAME}}'

Celery tasks can be also called outside python scripts using:

celery call task_name

The shutdown script calls the task remote_server_off inside the Docker container called tf_app and sends the server name as parameter. This script is executed when GCP is shutting down the instance so the app know that one instance is about to be stopped.

Finally, once our Server B is configured we can create a custom image from this server, this image will be used each time a new Server B instance is created, the image contains all the configurations and data of Server B.

To create the image you can read this page. To use the image you can read this page, basically we need to change:

"initializeParams": {
    .
    .
    .
    "sourceImage": "projects/IMAGE_PROJECT/global/images/IMAGE_NAME"
},

IMAGE_PROJECT and IMAGE_NAME values. IMAGE_PROJECT is the name of the project, for example tensorflow-spot-arch.

GPU Drivers

To install the Nvidia drivers we run:

wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/cuda-ubuntu2004.pin
sudo mv cuda-ubuntu2004.pin /etc/apt/preferences.d/cuda-repository-pin-600
sudo apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/7fa2af80.pub
sudo add-apt-repository "deb http://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64/ /"
sudo apt-get update && sudo apt-get install -y cuda-drivers

These drivers are for the 2004 version of Ubuntu.

We also need the NVIDIA Container Toolkit:

distribution=$(. /etc/os-release;echo $ID$VERSION_ID)
curl -s -L https://nvidia.github.io/nvidia-docker/gpgkey | sudo apt-key add -
curl -s -L https://nvidia.github.io/nvidia-docker/$distribution/nvidia-docker.list | sudo tee /etc/apt/sources.list.d/nvidia-docker.list

sudo apt-get update && sudo apt-get install -y nvidia-container-toolkit
sudo systemctl restart docker

More information about drivers can be found here and here

Production

There are some configurations needed to make the app production ready:

Resources

Celery

RabbitMQ

Mysql

GCP