Feb. 16, 2022
Serving Deep Learning Models using the Publish Subscribe Pattern
Complex models require good hardware to run, often a GPU is a most. This makes model serving only capable using cloud providers which offer virtual machines (instances) with GPUs attached.
These instances need to be listening to user requests all the time. Consequently the cost for these instances ascends to hundred or thousands of dollars each month.
Cloud providers like AWS, GCP, and Azure offer Spot Instances that are cheaper temporal instances where the provider can stop them at any time when the resources are required for other clients who are willing to pay more. Hence, spot instances should be used along with normal instances in order to provide a fulltime service.
The principal idea of this post is to show how a deep learning infrastructure can be built around these Spot Instances to run our neural networks. Two different kinds of instances will be used, one main server called Server A which will be working all the time, and one or more Spot Instances with GPUs attached to them called Server B. Server B instances are created when a request arrives and deleted when no new requests have arrived for a while, this is done to further economize the cost of the servers.
This infrastructure should handle:
- Creation and deletion of Server B instances.
- Disconnection of Server B instances.
- Disconnection of Server B instances WHILE they are doing some task.
- Communication between Server A and Server B instances.
These requirements can be achieved using a pub/sub architecture. Server A publishes messages that Server B subscribes to and the other way around.
The tools used to build this infrastructure are Django, Celery, RabbitMQ, Docker, and TensorFlow. However, some of these tools can be easily replaced, for instance, we could use Pytorch, FastAPI, Redis Pika, etc.
The model to serve is a Deep Learning model to enhance anime images. Code for Server A is here and for Server B is here.
Google Cloud is the selected platform to build both servers. Thus, the Python GCP libraries are used to create and delete instances and also to upload and download files to Google Storage.
Infrastructure Preview
Celery is used to communicate Server A and Server B. Celery has its process where functions can be executed. A function that is executed inside the Celery process is called a task. Celery can call a task in one machine and execute the task in a different machine, thus creating communication between the two machines. In the next section, we will learn more about how Celery works.
Server A is always listening to user requests through Django, in each request, Django receives the user's email and the images. Django calls several Celery tasks to:
- Upload the images to Google Storage.
- Check if there are Server B instances available.
- Run TensorFlow inference task in Server B.
If there is at least one Server B instance available, Celery executes the TensorFlow inference task in Server B, otherwise, Celery executes a function to create a new Server B instance.
Server B is only used when there are requests to enhance images, otherwise Server B is shut down so we are not charged for it. Both Server A and Server B run a Celery process, these processes communicate with each other thanks to RabbitMQ using messages that trigger tasks.
For each request, Server A calls a task that is executed in Server B. When a new Server B instance is ready to work, it sends a message to the Server A instance to execute a task called remote_server_ready.
Celery not only allows us to communicate Server A and Server B instances, each time a new request arrives this request is added to a Queue, the Celery process from Server B consumes requests from this Queue that can add thousands of new requests each second, furthermore, if there are a lot of requests waiting in the Queue, one or more Server B instances can be additionally created by the app and start consuming as soon as their Celery process is ready. Celery can handle all this without additional configuration on its own. This makes our infrastructure scalable to thousands of requests.
Throughout the following sections, we are going to learn all the essential details to understand how this infrastructure works.
Python, Django, threads and processes
Before getting into the Celery explanation we need to know how Python and Django handle user requests.
Python can only execute one Thread at the same time (GIL), threads can be used when we have I/O bound tasks where the CPU stays idle until the task is finished, since the CPU is waiting for some I/O response such as an HTTP request or a database query, Python is able to spawn another thread to handle additional tasks, but tasks can't run in parallel, they run concurrently, this is called Concurrency. If two tasks need to run in parallel at the same time, we have to spawn a new Process. Each Python application (Django, FastAPI, Celery, etc) spawns at least one main process to execute code.
For instance, when Django receives a request, its process creates a new thread to execute all the code that the request needs to return a response to the user, if Django receives another request at the same time, its process creates a second thread, however, this new thread can only execute code if the other thread (or threads) is waiting for some I/O bound task or until the other thread has finished its execution. In order to handle parallel requests in Django, a server like Gunicorn is used, this type of server spawns more Django processes, each one of these processes runs in one CPU/core so a multicore CPU is necessary. Also, each process can spawn multiple threads to handle concurrency.
This Django behavior occurs when a WSGI server is used (such as Gunicorn), ASGI servers behave differently.
Celery
Celery is used in this infrastructure to execute tasks in parallel and communicate Server A and Server B. When Django receives a request and this request has to execute a time-intensive task that is CPU Bound, so the cpu is used all the time, a new request has to wait until the task is finished. To overcome this problem, the task execution can be moved to the Celery process and leave Django free for new requests.
In Celery we have the following concepts:
- Task: Are just Python functions labeled as tasks that can run in the Celery process.
- Worker: This is the Celery process (execution of the Celery App)
- Producer: The process/application that sends messages.
- Consumer: The process/application that receives messages.
- Message: The data that the Producer sends and the Consumer receives.
- Queue: A buffer that stores multiple messages.
Pika is a library similar to Celery where the concepts of Producer and Consumer are easier to understand since each has its own process, one process (Producer) sends messages to the other process (Consumer), in the case of Celery, the Celery worker acts as the Producer and Consumer in the same execution/process. Thus, the same worker that sends a message can also receive the same message.
Celery creates all the low level logic for us whereas in Pika we are the ones who must create the logic for the consumers and producers.
The Message is just the data that we want to send to the task, if we have a send_email task, this task needs the user's email, then the email becomes the message:
email = request.POST.get('email')
send_email.delay(email)
Using .delay we can execute the send_email task/function in the Celery worker, internally, Celery produces/sends a message with the contents of email and also consumes/receives the same message to execute the send_email task. Thus, each time we talk about "sending a message" we also talk about a task execution.
Multiple Celery workers can be running in the same or different machines, when a message is sent from any worker, all the available workers (including the one that sent the message) will try to consume the message. In other words, each worker is always listening to all the messages that are sent, even though only one worker can consume the message, we don't know which worker will be the one consuming the message and this can be a problem.
For instance, in Server A we have Django and one Celery worker running, in Server B we have TensorFlow and another Celery worker running, for each request to enhance images, a message is sent from Server A Celery worker, and we want that Server B Celery worker consumes this message, but the same Server A Celery worker might end up consuming the message causing a problem since TensorFlow is not running in the Server A Celery Worker.
In order to fix this problem Queues are used to route messages properly. When a Celery worker is executed without any configuration, the worker creates a default Queue, this can be called Celery or Default, this default Queue buffers all the messages that any Celery worker sends and also each Celery worker consumes messages from this Queue. Thus, when a new message arrives in the default Queue, all the workers will try to consume the message but only one fulfill the task.
More Queues can be created by Celery, for instance Celery can create a Queue called images, when a new request to enhance images arrives, Celery can send a message only to this Queue and Celery workers can also listen to one specific Queue. In this way we can make the Celery worker from Server B to only consume messages from the images Queue and make the Server A to only consume messages from the Default Queue, so we make sure that each task is executed where it should do.
Server B only consumes messages from the images Queue but can send messages to the default Queue, Server A only consumes messages from the default Queue and in the same way can send messages to the images Queue.
RabbitMQ
As we have seen, Celery is in charge of sending and consuming messages and even creating queues, however, we still need a service where all the messages and queues are stored. This service is called broker, the broker is where all the messages are stored until a worker consumes them.
Each Celery worker makes a connection to the broker, in this way multiple workers can be synchronized to operate together.
RabbitMQ is the selected broker for this infrastructure, nevertheless, other types of brokers can be used, for example, Redis. Celery always needs a broker to work.
Server A. Django and Celery Apps
Server A should have at least 2GB of ram and 2 CPU cores.
Once we know the kit of tools to build the infrastructure, let's check the code for the Server A applications. In this repository you can find the code for this section, I reccomend you to check all the code in the repository since here we will only review the most important parts.
To make Celery work inside Django the configuration is the following one:
superresolution/superresolution/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'superresolution.settings.development')
app = Celery('superresolution')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
app is the instance of the Celery object
superresolution/superresolution/settings/common.py
from kombu import Queue
.
.
.
CELERY_BROKER_URL = os.environ['BROKER_URL']
CELERY_RESULT_BACKEND = 'django-db'
CELERY_TIMEZONE = 'UTC'
CELERY_TASK_QUEUES = (
Queue('default', routing_key='default'),
Queue('images', routing_key='images.enhance'),
)
CELERY_TASK_DEFAULT_QUEUE = 'default'
- CELERY_BROKER_URL: the url to connect Celery to RabbitMQ.
- CELERY_RESULT_BACKEND: tasks can return results and states, here the same database for Django is used to store the results/states from the tasks.
- CELERY_TASK_QUEUES: The two Queues that the whole infrastructure will use. Server B consumes requests from the images queue.
- CELERY_TASK_DEFAULT_QUEUE: If we don't specify what queue should the message go to, the message will go to the default queue.
To create Celery tasks we need to put all those tasks inside a file called tasks.py
superresolution/tasks/tasks.py
@shared_task
def remote_server_ready(name):
"""
Called when remote server is done
"""
# task has to wait until we add a new task from the request
server_name = SRSpotServer.update_server_status(SRSpotServer.SRSpotServerStatus.ON, name)
# Log the execution + server_name
pending_tasks = SRTask.objects.filter(status=SRTask.SRTaskStatus.INCOMPLETE).only("id", "zip_id")
for task in pending_tasks:
celery_app.send_task('remote.run_sr_task', [task.id, task.zip_id], queue='images')
pending_tasks.update(status=SRTask.SRTaskStatus.RUNNING)
# Log the execution + tasks_added
The decorator @shared_task is used to declare the remote_server_ready function as a Celery task, which can run in the Celery worker.
The remote_server_ready task is called from Server B and executed in Server A, SRSpotServer is a Django model that stores all the Google Cloud instances that are created, in this case we are calling the update_server_status class method from the SRSpotServer model:
superresolution/tasks/models.py
@classmethod
@transaction.atomic()
def update_server_status(cls, status, name):
last_server = cls.get_queryset().select_for_update().filter(name=name).latest('created_at')
# from DEPLOYING to ON
last_server.status = status
last_server.save(update_fields=['status'])
return last_server.name
Race Condition
To understand the code above, the race condition concept needs to be introduced, as we know, Django can have multiple threads and processes running to handle multiple requests at the same time, when one of these requests makes a database query, this denotes an I/O bound task, hence, another thread or process can proceed and handle another request. This also implies that two requests are able to query the same data from the database, inside the update_server_status function a record/row is obtained by its name .filter(name=name)
to later change its state from DEPLOYING to ON, (since Celery is receiveing the signal from Server B, which is ready to receive requests, we change the status of that specific Server B).
It could happen that two or more requests query the same SRSpotServer object at the same time, so even if one request is changing its status to ON, the other requests can still get the SRSpotServer object's status as DEPLOYING. This problem is worse in the following example:
@shared_task
def get_server_information_task(email, zip_id):
server_available, task, server_name = SRSpotServer.get_server_information(email, zip_id)
if task:
task = model_to_dict(task, fields=["id", "zip_id"])
return server_available, task, server_name
In this Celery task we ask for the server status using the class method get_server_information:
superresolution/tasks/models.py
@classmethod
@transaction.atomic()
def get_server_information(cls, email, zip_id):
on_servers = cls.get_queryset().select_for_update().all()
on_servers = list(on_servers.values_list('status', flat=True)) # Rows are locked here
current_task = SRTask(email=email, zip_id=zip_id)
if "ON" in on_servers: # at least one server is available
current_task.status = SRTask.SRTaskStatus.RUNNING
current_task.save()
return True, current_task, None
elif "DEPLOYING" in on_servers:
current_task.save()
return True, None, None
current_task.save()
# Not server available, create a new one
last_server = cls.objects.create()
return False, None, last_server.name
When a new request arrives, Django look through the states of all the SRSpotServer objects, searching for some Server B which status is ON to send the request, otherwise, Django creates a new Server B.
SRSpotServer objects can have 3 states, DEPLOYING, ON and OFF. Using:
on_servers = cls.get_queryset().select_for_update().all()
on_servers = list(on_servers.values_list('status', flat=True))
Django queries the database to obtain all the SRSpotServer objects' states inside a list, if this list does not contain one ON or DEPLOYING values then there are no Server B instances available so Django creates a new one:
last_server = cls.objects.create()
The problem take place when two or more requests arrive at the same time. User 1 and User 2 execute the following code with a difference of milliseconds:
on_servers = cls.get_queryset().select_for_update().all()
on_servers = list(on_servers.values_list('status', flat=True))
on_servers list only contains OFF states.
User 1 get to the next code line first:
last_server = cls.objects.create()
Thus, User 1 creates a new SRSpotServer record in the database which state is DEPLOYING.
If we run the next code again:
on_servers = cls.get_queryset().select_for_update().all()
on_servers = list(on_servers.values_list('status', flat=True))
Now the list on_servers also contains one DEPLOYING value. However, User 2 got to the code before User 1 created the new SRSpotServer record and did not found any Server B available, thus, User 2 also gets to the code line to create a new SRSpotServer record as well:
last_server = cls.objects.create()
This problem where two or more threads/processes access to the same record at the same time is called race condition. In order to over come the problem, a deadlock is used where the queried rows are locked so no other thread/process can read or modify its content. As you may noticed, both of the previous methods have the following decorators:
@transaction.atomic()
And both methods use the .select_for_update()
method when the database is queried. select_for_update locks the selected rows and the @transaction.atomic() decorator keeps the lock until the whole function is executed. In this way, User 1 locks the rows, creates a new SRSpotServer record, meanwhile User 2 is waiting, once the rows are unlocked, User 2 reads the rows that now contain a new SRSpotServer record with status DEPLOYING so User 2 does not create another SRSpotServer record.
In this case all the rows in the table are locked which could not be necessary, for example if the whole infrastructure can only create up to N Server B instances, we could just get the last n rows and lock them.
The SRSpotServer model contains several more methods where the same technique to lock rows is used. For instance remote_server_down:
@classmethod
@transaction.atomic()
def remote_server_down(cls, name):
last_server = cls.get_queryset().select_for_update().filter(name=name).latest('created_at') # row is locked here
# from ON to OFF
last_server.status = SRSpotServer.SRSpotServerStatus.OFF
last_server.save(update_fields=['status'])
remain_servers = SRSpotServer.objects.filter(status=SRSpotServer.SRSpotServerStatus.ON)
if remain_servers.exists():
# if more workers/servers available they can consume the pending_tasks if any
return last_server.name, None
pending_tasks = SRTask.objects.filter(
Q(status=SRTask.SRTaskStatus.INCOMPLETE) | Q(status=SRTask.SRTaskStatus.RUNNING)
)
if pending_tasks.exists():
# if there are pending tasks and no servers/workers available we have to create a new server
current_server = cls.objects.create()
return last_server.name, current_server.name
The method above is executed when Google Cloud is going to stop one of the Server B instances. When GCP stops one of our spot instances, the platform notifies us 30 seconds before so we are able to handle the disconnection, in this case the Celery process from Server B calls the remote_server_off Celery task inside the tasks.py file:
@shared_task
def remote_server_off(name):
last_server_name, new_server_name = SRSpotServer.remote_server_down(name)
if new_server_name:
response = create_gcloud_vm(new_server_name)
# Log the execution + new server created
response = delete_gcloud_vm(last_server_name)
# Log the execution + server down
At the same time this task calls the remote_server_down method.
Inside this method the SRSpotServer record which is being stopped by GCP (Obtained by its name .filter(name=name)
) is locked:
last_server = cls.get_queryset().select_for_update().filter(name=name).latest('created_at')
A new SRSpotServer record is also created if pending tasks exists:
pending_tasks = SRTask.objects.filter(
Q(status=SRTask.SRTaskStatus.INCOMPLETE) | Q(status=SRTask.SRTaskStatus.RUNNING)
)
if pending_tasks.exists():
# if there are pending tasks and no servers/workers available we have to create a new server
current_server = cls.objects.create()
The record is locked inside this method to avoid the same race condition problem, a new request to get_server_information could arrive when Server B is shutting down by GCP, this request look through each SRSpotServer object and don't find objects which state is ON (all the Server B instances are down) so the request creates a new SRSpotServer record. However, remote_server_down could be also creating a new SRSpotServer record since pending tasks could exist.
If the row is first locked inside remote_server_down and the query from get_server_information contains this locked row, the execution of get_server_information must wait until the remote_server_down execution finishes
SRTask is also a Django model created each time a request to enhance anime images arrives.
Our Django app has two different pages, one to upload the images to enhance and one to download the enhanced images
superresolution/tasks/views.py
def index(request):
if request.method == 'GET': return render(request, 'index.html')
image_files = request.FILES.getlist('images')
email = request.POST.get('email')
zip_id = uuid.uuid4()
make_zip_images(image_files, zip_id)
chain(
group(
upload_files_task.s(zip_id),
get_server_information_task.s(email, zip_id)
) |
send_task_to_server.s()
).apply_async()
messages.success(request, 'Your images are being processed, please wait some minutes')
return redirect("tasks:download")
index takes the user request to enhance images, since Celery can only take serialized data, such as Json objects, Strings, lists or dictionaries, and images are not serialized, we first create a temporal zip file that contains the images using the make_zip_images function, the variable zip_id is used to identify each zip file along all the app, both Server A and Server B instances. The functions chain and group come from Celery, these functions are used to create workflows for our Celery tasks.
All the tasks inside the chain function are executed in the Celery Worker, in this way Django only executes the make_zip_images function and calls the Celery tasks so the whole index request is really fast.
group executes several tasks in parallel, in this case, upload_files_task and get_server_information_task are executed at the same time (thus we need a multi-core processor), the former uploads the zip file, which contains the images, to Google Cloud Storage and the latter is the function that gets the state of the Server B instances to create a new instance or obtain one of the available instances.
chain links several tasks, the next task is called until the previous task is done, in this case the group count as one big task, until all the tasks inside this group are done the next task, send_task_to_server, is executed.
superresolution/tasks/tasks.py
@shared_task
def send_task_to_server(args):
url, server_info = args
server_available, task, server_name = server_info
if server_available and task:
celery_app.send_task('remote.run_sr_task', [task.get("id"), task.get("zip_id")], queue='images')
elif not server_available:
response = create_gcloud_vm(server_name)
# Handle errors, status, logging
This task receives the results from the previous tasks, if at least one Server B instance is available, the run_sr_task tasks from Server B Celery Worker is called:
celery_app.send_task('remote.run_sr_task', [task.get("id"), task.get("zip_id")], queue='images')
send_task is used to call tasks that are not available in the current worker but in a remote worker, run_sr_task is a task that is defined only in the Celery worker from Server B, this worker is called remote, the task id and zip_id information is sent, the queue that is used to send the message is also selected.
if no Server B instances are available, the create_gcloud_vm function is executed, this function uses the Google Cloud Python API to create new instances.
Server B. Celery and TensorFlow Apps
Server B only has a Celery Worker and a TensorFlow application, Docker is used to serve both applications, code can be found in this repository.
Dockerfile
FROM tensorflow/tensorflow:2.7.0-gpu
# if none GPU available use:
# FROM tensorflow/tensorflow:2.7.0
COPY app/requirements.txt /usr/src
WORKDIR /usr/src/
RUN pip3 install -r requirements.txt
COPY app /usr/src
ENV CELERY_BROKER=pyamqp://rabbit_user:rabbit_password@ip:5672
EXPOSE 5672
CMD ["celery", "-A", "remote", "worker", "--pool=solo", "-Q", "images", "-l", "info", "--without-heartbeat"]
This Docker file is used to build the docker container, the image used is the official from TensorFlow, here we can use the image with GPU integration, or the image without GPU just to test the infrastructure when we don't have access to some GPUs in GCP.
To connect the Celery Worker to RabbitMQ we need the CELERY_BROKER env variable, the RabbitMQ configuration is covered in the Server B configuration section where rabbit_user and rabbit_password are created, the ip used must be the public ip of Server A.
To run the Celery worker the following command is used:
celery -A remote worker --pool=solo -Q images -l info --without-heartbeat"
remote is the name of the worker, -Q indicates which Queues the worker is going to listen to, in this case the images Queue, -l indicates the log level of Celery, --without-heartbeat indicates that the worker is not going to send periodic messages to send its state. The latter is a recommended configuration where RabbitMQ is the one that should handle the state of the workers and not the worker itself.
Finally we have --pool=solo, Celery workers can run in different ways:
- pool=prefork: This is the default behavior, here the Celery process spawns child processes to execute the tasks, by default Celery spawns the same number of child processes as cores available but we can change this using
--concurrency=n
where n is the number of child processes that Celery will spawn. - pool=solo: Here all the tasks will run in the same Celery process which won't spawn more processes, hence, all the tasks will run sequentially.
The TensorFlow app most of the time will use the whole GPU memory to enhance the anime images, thus if we run two or more tasks in parallel some of these tasks won't be able to run and will trigger an error since no more memory is available. Thus, the solo option is used to run one task at a time.
app/remote.py
broker_url = os.environ.get('CELERY_BROKER', '')
app = Celery('superresolution', broker=broker_url)
app.conf['worker_prefetch_multiplier'] = 1
app.conf['task_acks_late'] = True
app.conf['timezone'] = 'UTC'
app.conf['broker_heartbeat'] = 0
tf_inference = None
g_storage = None
"""
Local Tasks
"""
@shared_task(queue='images')
def run_sr_task(task_id, zip_id):
g_storage.create_local_folders(zip_id)
g_storage.download_images(zip_id)
tf_inference.run_inference(zip_id)
g_storage.upload_images(zip_id)
# Logger
app.send_task('tasks.tasks.task_finished', [task_id], queue='default')
"""
Remote Tasks
"""
@worker_ready.connect
def notify_server_created(sender, **kargs):
server_name = os.environ.get('SERVER_NAME', '')
global tf_inference
tf_inference = TFInference()
global g_storage
g_storage = GCloudStorage()
# print(type(sender)) <class 'celery.worker.consumer.consumer.Consumer'>
with sender.app.connection() as conn:
# print(type(conn)) <class 'kombu.connection.Connection'>
sender.app.send_task('tasks.tasks.remote_server_ready', [server_name], connection=conn, queue='default')
The Celery worker from Server B is configured differently than the worker from Server A. We can find two principal differences:
- worker_prefetch_multiplier: By default the Celery worker prefetches 4 messages/tasks per process. For instance, if we have 5 tasks and 2 Celery workers, the first worker to start will reserve 4 of these tasks and the second worker only 1, even if the second worker has finished the execution of the task, the first worker already reserves 4 more tasks that will only run in the same worker. Changing this value to 1 disables this behavior.
- task_acks_late: When the execution of one task starts, Celery acknowledges the message in the broker so no other worker or the same worker can receive the same message and run the task again. However, the spot instances used in this infrastructure can be shut down at any time, hence, if one running task has not finished and the instance is shut down, the task is lost forever since its message has been acknowledged already in the broker. To overcome this problem the task_acks_late setting is set to True, in this way the message is only acknowledged after the task execution is done, so if the task is interrupted another Celery worker can receive the message and run the task again.
When there is only one Server B instance running tasks and this instance is terminated by GCP, setting task_acks_late as True keeps the messages in the broker until a new Server B instance is created, this new instance once connects to the broker will begin to receive all the messages which were not acknowledged as well as the new messages from new requests.
notify_server_created task runs when the Celery worker is ready to consume messages, tf_inference is used to run the Anime Model using TensorFlow, and g_storage is used to download and upload the zip files from Google Cloud Storage, the task remote_server_ready is called here and executed in Server A, when the latter task is called, server_name is sent as a parameter so Server A knows which Server B instance is ready to consume messages.
run_sr_task is the task where the images are enhanced:
- The zip file with the zip_id identifier is downloaded from Google Cloud storage and uncompressed.
- TensorFlow loads the uncompressed images.
- TensorFlow runs the inference.
- The enhanced images are compressed and uploaded to Google Cloud storage using the same zip_id identifier.
- task_finished is called here and executed in Server A.
Each SRTask record belongs to one user, the record saves the user's email, the zip_id and the record id, the zip_id, and id of each record is sent as parameters to the run_sr_task task, this task calls the task_finished and sends again the id as parameter:
supperresolution/tasks/tasks.py
@shared_task
def task_finished(task_id):
"""
Called when the remote server has processed the images from the task_id
"""
current_task = SRTask.objects.get(id=task_id)
current_task.status = SRTask.SRTaskStatus.DONE
current_task.save(update_fields=['status'])
pending_tasks = SRTask.objects.filter(status=SRTask.SRTaskStatus.RUNNING)
# we have the last task of the queue
if not pending_tasks.exists():
to_time = datetime.utcnow() + timedelta(minutes=25)
keep_or_kill_server_task.apply_async((task_id,), eta=to_time)
In the same way as SRSpotServer records, SRTask records can have 3 different states, INCOMPLETE, RUNNING and DONE. When a new SRTask record is created its state is INCOMPLETE, if this record is sent to the run_sr_task task, its state changes to RUNNING, when the task task_finished is executed the record's state changes to DONE, when the user asks for their images and the record's state is DONE, the enhanced images are downloaded and returned to the user, otherwise the user is notified that their images are not ready yet.
Having states for each user's task allows us to have control of the whole workflow, for instance if there are SRTask records which state is INCOMPLETE and these records were created some time ago, this means that something is failing, maybe the Server B instance creation triggered an error and could not be created.
Finally, in order to save resources and money, each Server B instance is terminated if no new requests have arrived in a while. Inside the task_finished task we ask if there are more requests to enhance images or if the current request was the last:
pending_tasks = SRTask.objects.filter(status=SRTask.SRTaskStatus.RUNNING)
# we have the last task of the queue
if not pending_tasks.exists():
to_time = datetime.utcnow() + timedelta(minutes=25)
keep_or_kill_server_task.apply_async((task_id,), eta=to_time)
In the case of the current request being the last one, the task keep_or_kill_server_task will be executed after 25 minutes.
@shared_task
def keep_or_kill_server_task(task_id):
last_task = SRTask.objects.latest('created_at')
if last_task.id == task_id:
# after 25 minutes no new tasks arrived
names = SRSpotServer.kill_server_by_application()
for name in names:
response = delete_gcloud_vm(name)
# Log the execution + server deleted
Once the task is executed, if the SRTask record with id task_id is the last created, so no new requests arrived in the 25 minutes period, all the Server B instances are terminated by the application.
This behavior can be different, it all depends on the needs of the application, for instance, a Celery beat task, which runs every n minutes or hours, can execute the same code and look over the requests if new requests did not arrive then stop all the instances. This same task can also inspect the number of requests and decide if a new Server B instance should be created to accelerate the workflow.
Celery is really flexible and allows us to make our own workflows. Even Celery has a page dedicated exclusively to the different workflows that can be used to build applications.
Configuration of Server A
Let's start with the configuration of Server A, We need Django, RabbbitMQ and Mysql.
Install RabbitMQ
In order to install RabbitMQ in a ubuntu server we need to copy the contents of the script https://www.rabbitmq.com/install-debian.html#apt-quick-start-packagecloud
into a new script like:
vim rabbit.sh
use
sudo chmod +x rabbit.sh
and
./rabbit.sh
Check if RabbitMQ is running and its version:
systemctl status rabbitmq-server
sudo rabbitmqctl version
Version 3.9.13 at the creation of this post.
For production settings RabbitMQ recommends changing the limit of free disk space, if the server has less than this free disk space, RabbitMQ stops taking new messages until more free disk space or ram is available
sudo vim /etc/rabbitmq/rabbitmq.conf
inside add:
disk_free_limit.relative = 1.0
If we have 1gb of ram, the limit will be 1gb and so on.
Restart the service to take the changes:
sudo systemctl restart rabbitmq-server
We can check the configuration using:
sudo rabbitmq-diagnostics status
To accept new connections from remote servers a new user should be created:
sudo rabbitmqctl add_user serverArabbituser myrabbitsuperpw
where serverArabbituser is the username and myrabbitsuperpw the password
The new user needs permissions:
sudo rabbitmqctl set_permissions -p / serverArabbituser ".*" ".*" ".*"
The permissions are for configure, write, and read:
- configure allows the user to create or destroy resources.
- write allows the user to send messages to the broker
- read allows the user to get messages from the broker
You can read more about it here
Finally we restart the service again:
sudo systemctl restart rabbitmq-server
For security reasons we should also delete the default user called guest:
sudo rabbitmqctl delete_user guest
Google Cloud Platform
Some GCP knowledge is necessary due to the relative difficulty of the page platform.
GCP encloses different services by projects, A project is created when we want to access to the platform services. I called the project tensorflow-spot-arch, to access all the GCP services using the Python API we need a service account JSON file.
We need to go to IAM & Admin/Service Accounts and click Create Service Account. In the section Grant this service account access to project add Compute Instance Admin (v1), Storage Admin and Service Account User as roles. Once the service account is created we need to download a JSON key. Select the created account and click Manage Keys, in the Keys section click Add Key and select JSON. This JSON file is used in Server A and Server B instances.
If we need to add more roles to an already created service account we can go to IAM & Admin/IAM and click Edit principal for the service account that we want to modify.
Now in Cloud Storage we need to create a new bucket called tensorflow-spot-arch-bucket and inside the bucket create two folders, current and done. Under Permissions we need to add another role, in the new principals field put the name of the service account and in roles add Storage Legacy Bucket Owner.
The names of the project and buckets can be different, to use different names we need to change the variables inside the superresolution.settings/common.py file:
GS_BUCKET_NAME = "tensorflow-spot-arch-bucket"
GCP_PROJECT_NAME = "tensorflow-spot-arch"
GCP_ZONE = "us-central1-b"
GCP_REGION = "us-central1"
GCP_SERVICE_EMAIL = "tensorflow-spot-arch-service-a@tensorflow-spot-arch.iam.gserviceaccount.com"
If something fails while creating or deleting servers you should look at the Operations section in compute engine.
If Server A instance is also created using GCP, the RabbitMQ port needs to be open, we can open ports adding firewall rules inside VPC network/Firewall, the firewall type is Ingress and the port is tcp:5672.
At the end of the post there are resources to learn more about GCP.
Django configuration
We are going to use MYSQL as the database, the following steps are needed to install all the dependencies for the Django app:
You should change someuser and somepassword:
sudo apt-get install mysql-server
# We may need to use:
# sudo mysql_secure_installation
sudo mysql
CREATE USER 'someuser'@'localhost' IDENTIFIED WITH mysql_native_password BY 'somepassword';
CREATE DATABASE app_db;
GRANT ALL ON app_db.* TO 'someuser'@'localhost';
FLUSH PRIVILEGES;
Then we create a mysql configuration file that Django will use to connect to the db:
sudo vim /etc/mysql/dj.cnf
[client]
database = app_db
user = someuser
password = somepassword
default-character-set = utf8mb4
[mysql]
default-character-set = utf8mb4
[mysqld]
character-set-client-handshake = FALSE
character-set-server = utf8mb4
collation-server = utf8mb4_unicode_ci
sudo systemctl daemon-reload
sudo systemctl restart mysql
For django we need to install some dependencies:
sudo apt install -y python3-pip
sudo apt install -y build-essential libssl-dev libffi-dev python3-dev
sudo apt install libmysqlclient-dev default-libmysqlclient-dev
sudo apt install python3.8-venv
Now inside the folder application:
python3 -m venv djangoenv
. djangoenv/bin/activate
pip install -r requirements.txt
Finally we create the environment variables for the application:
vim .env
DJANGO_SETTINGS_MODULE="myblog.settings.production"
SERVER_IP=""
SECRET_KEY=""
SERVICE_GCP_FILE="route to the service account JSON file"
BROKER_URL="pyamqp://serverArabbituser:myrabbitsuperpw@ip:5672"
SERVER_IP should be the public ip of the instance.
This file will be used by gunicorn.
The ip for the broker url needs to be the internal/private ip of the instance if a provider like GCP, AWS, Azure is used.
We have to add:
export DJANGO_SETTINGS_MODULE="myblog.settings.production"
source ~/.bashrc
We could need to run . djangoenv/bin/activate again.
to the .bashrc file so all the manage.py commands run with the production environment.
run:
python manage.py migrate
python manage.py migrate django_celery_results
python manage.py createsuperuser
Celery configuration
To run the Celery worker for Server A we use:
celery -A superresolution worker -l info -Q default
This worker will only listen to the default Queue.
However, it's a better practice to run the worker as a daemon. More info can be found in this page. Systemd is a good option to daemonize the worker.
Gunicorn configuration
run:
sudo nano /etc/systemd/system/gunicorn.socket
[Unit]
Description=gunicorn socket
[Socket]
ListenStream=/run/gunicorn.sock
[Install]
WantedBy=sockets.target
and
sudo nano /etc/systemd/system/gunicorn.service
[Unit]
Description=gunicorn daemon
Requires=gunicorn.socket
After=network.target
[Service]
User=deployuser
Group=www-data
WorkingDirectory=/home/deployuser/superresolution
EnvironmentFile=/home/deployuser/superresolution/.env
ExecStart=/home/deployuser/superresolution/djangoenv/bin/gunicorn \
--access-logfile - \
--workers 3 \
--bind unix:/run/gunicorn.sock \
superresolution.wsgi:application
[Install]
WantedBy=multi-user.target
Change deployuser to the server user used to deploy the application.
sudo systemctl start gunicorn.socket
sudo systemctl enable gunicorn.socket
Nginx configuration
sudo apt install nginx
sudo nano /etc/nginx/sites-available/superresolution
server {
listen 80;
server_name public_ip;
location = /favicon.ico { access_log off; log_not_found off; }
location / {
include proxy_params;
proxy_pass http://unix:/run/gunicorn.sock;
}
}
sudo ln -s /etc/nginx/sites-available/superresolution /etc/nginx/sites-enabled
sudo nginx -t
sudo systemctl restart nginx
Configuration of Server B
The first step is to install Docker from this page
Get the app files from the repository using zip or git. The Service Account JSON file should be added to app/config folder. Once inside the folder app, at the level of the Dockerfile run:
sudo docker build -t tf:remote .
Now the container can be run using:
sudo docker run -it --name tf_app tf:remote
However, we want this process to be automatic, for this reason GCP has startup scripts that run once the virtual machine is ready:
#!/bin/bash
NAME=$(curl http://metadata.google.internal/computeMetadata/v1/instance/name -H "Metadata-Flavor: Google")
sudo docker run \
--gpus all \
-e SERVER_NAME=${NAME} \
--name tf_app tf:remote
Each Server B instance is created with a different name, this name is saved as metadata, we can access to this metadata using the computeMetadata API. This name is used to identify each Server B instance throughout the whole infraestructure.
Inside the file superresolution/tasks/gcloud_config.py is the configuration used to create each Server B instance, the startup script is declared as metadata in the same configuration.
In the case that GPU instances can not be used, we can remove the following part:
{
"acceleratorCount": 1,
"acceleratorType": f"projects/{project_name}/zones/{zone}/acceleratorTypes/ nvidia-tesla-t4"
}
and --gpus all from the startup script.
inside guestAccelerators so the instances are created without GPUs.
In the same way as startup scripts, shutdown scripts can be used to run commands when the instance is stopped:
#!/bin/bash
NAME=$(curl http://metadata.google.internal/computeMetadata/v1/instance/name -H "Metadata-Flavor: Google")
sudo docker exec -it \
tf_app \
celery call tasks.tasks.remote_server_off \
--queue=default -k '{"name": ${NAME}}'
Celery tasks can be also called outside python scripts using:
celery call task_name
The shutdown script calls the task remote_server_off inside the Docker container called tf_app and sends the server name as parameter. This script is executed when GCP is shutting down the instance so the app know that one instance is about to be stopped.
Finally, once our Server B is configured we can create a custom image from this server, this image will be used each time a new Server B instance is created, the image contains all the configurations and data of Server B.
To create the image you can read this page. To use the image you can read this page, basically we need to change:
"initializeParams": {
.
.
.
"sourceImage": "projects/IMAGE_PROJECT/global/images/IMAGE_NAME"
},
IMAGE_PROJECT and IMAGE_NAME values. IMAGE_PROJECT is the name of the project, for example tensorflow-spot-arch.
GPU Drivers
To install the Nvidia drivers we run:
wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/cuda-ubuntu2004.pin
sudo mv cuda-ubuntu2004.pin /etc/apt/preferences.d/cuda-repository-pin-600
sudo apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/7fa2af80.pub
sudo add-apt-repository "deb http://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64/ /"
sudo apt-get update && sudo apt-get install -y cuda-drivers
These drivers are for the 2004 version of Ubuntu.
We also need the NVIDIA Container Toolkit:
distribution=$(. /etc/os-release;echo $ID$VERSION_ID)
curl -s -L https://nvidia.github.io/nvidia-docker/gpgkey | sudo apt-key add -
curl -s -L https://nvidia.github.io/nvidia-docker/$distribution/nvidia-docker.list | sudo tee /etc/apt/sources.list.d/nvidia-docker.list
sudo apt-get update && sudo apt-get install -y nvidia-container-toolkit
sudo systemctl restart docker
More information about drivers can be found here and here
Production
There are some configurations needed to make the app production ready:
-
Image validations: We need to validate the images that the users sends to Django (amount, type), the GPU used for each Server B instance is an Nvidia Tesla T4 with 16GB of VRAM, this GPU can take up to 5 images of size around 1300x700, the memory is not enough for more than 5 images of that size for each batch.
-
Batch Size: The model takes one image at a time (batch size is 1), images must have the same size to form a batch, if images have different size we could add padding to the smaller images.
-
Model Optimizations: The anime model does not have any optimizations, we could optimize the model to get better inference performance.
-
RabbitMQ configuration: RabbitMQ needs more specific configurations to be ready for production usage. One of these configurations is TCP Keepalives
-
Logging: The whole app needs a logging system to log each performed action such as instances creations or tasks executions.
-
Errors: Errors need to be handled, Celery tasks could fail, in that case we could try to run the task again, log the error and notify the sysadmin. For instance, create_gcloud_vm and delete_gcloud_vm tasks could contain a key errors with information about what went wrong.
-
Server B instances: By now, the app only creates one Server B instance, if we need more instances we could use a Celery Beat task to examine the state of the requests every n hours or minutes and create more instances if needed.
-
Users: Django only asks for the User email so the user can get back its images. However, sending the images by email could be also an option. An User model could be created to get a more robust system.
-
Users Images: Currently the app only returns the last task in the download view, if the user requested more images at different moments the previous requests can't be returned.
-
Kubeflow: Since we are already using Docker containers to run the app in the Server B instance, we could just use Kubeflow instead.
Resources
Celery
- https://docs.celeryproject.org/en/stable/userguide/optimizing.html
- https://docs.celeryproject.org/en/stable/userguide/optimizing.html#optimizing-prefetch-limit
- https://docs.celeryproject.org/en/stable/userguide/routing.html#routing-basics
- https://blog.wolt.com/engineering/2021/09/15/5-tips-for-writing-production-ready-celery-tasks/
- https://www.youtube.com/watch?v=8YLeWxLtVgo
- https://www.cloudamqp.com/docs/celery.html
- https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html
RabbitMQ
- https://www.rabbitmq.com/networking.html#tcp-keepalives
- https://www.rabbitmq.com/ttl.html
- https://raw.githubusercontent.com/rabbitmq/rabbitmq-server/v3.9.12/deps/rabbitmq_management/bin/rabbitmqadmin
- https://www.cloudamqp.com/blog/part1-rabbitmq-best-practice.html
- http://www.rabbitmq.com/memory.html
Mysql
- https://dev.mysql.com/doc/refman/8.0/en/innodb-locking.html
- https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html
- https://dev.mysql.com/doc/refman/8.0/en/innodb-next-key-locking.html