Ideas on using Celery with Flask for background tasks

Simply put, Celery is a background task runner. It can run time-intensive tasks in the background so that your application can focus on the stuff that matters the most. In context of a Flask application, the stuff that matters the most is listening to HTTP requests and returning response.

By default, Flask runs on a single-thread. Now if a request is executed that takes several seconds to run, then it will block all other incoming requests as it is single-threaded. This will be a very bad-experience for the user who is using the product. So here we can use Celery to move time-hogging part of that request to the background.

I would like to let you know that by “background”, Celery means another process. Celery starts worker processes for the running application and these workers receive work from the main application. Celery requires a broker to be used. Broker is nothing but a database that stores results of a celery task and provides a shared interface between main process and worker processes. The output of the work done by the workers is stored in the Broker. The main application can then access these results from the Broker.

Using Celery to set background tasks in your application is as simple as follows –

def background_task(*args, **kwargs):
    # do stuff
    # more stuff

Now the function background_task becomes function-able as a background task. To execute it as a background task, run –

task = background_task.delay(*args, **kwargs)
print task.state  # task current state (PENDING, SUCCESS, FAILURE)

Till now this may look nice and easy but it can cause lots of problems. This is because the background tasks run in different processes than the main application. So the state of the worker application differs from the real application.

One common problem because of this is the lack of request context. Since a celery task runs in a different process, so the request context is not available. Therefore the request headers, cookies and everything else is not available when the task actually runs. I too faced this problem and solved it using an excellent snippet I found on the Internet.

Celery task wrapper to set request context vars and global
vars when a task is executed
Based on
from celery import Task
from flask import has_request_context, make_response, request, g

from app import app  # the flask app

__all__ = ['RequestContextTask']

class RequestContextTask(Task):
    """Base class for tasks that originate from Flask request handlers
    and carry over most of the request context data.
    This has an advantage of being able to access all the usual information
    that the HTTP request has and use them within the task. Pontential
    use cases include e.g. formatting URLs for external use in emails sent
    by tasks.
    abstract = True

    #: Name of the additional parameter passed to tasks
    #: that contains information about the original Flask request context.
    CONTEXT_ARG_NAME = '_flask_request_context'
    GLOBALS_ARG_NAME = '_flask_global_proxy'
    GLOBAL_KEYS = ['user']

    def __call__(self, *args, **kwargs):
        """Execute task code with given arguments."""
        call = lambda: super(RequestContextTask, self).__call__(*args, **kwargs)

        # set context
        context = kwargs.pop(self.CONTEXT_ARG_NAME, None)
        gl = kwargs.pop(self.GLOBALS_ARG_NAME, {})

        if context is None or has_request_context():
            return call()

        with app.test_request_context(**context):
            # set globals
            for i in gl:
                setattr(g, i, gl[i])
            # call
            result = call()
            # process a fake "Response" so that
            # [email protected]_request`` hooks are executed
            # app.process_response(make_response(result or ''))

        return result

    def apply_async(self, args=None, kwargs=None, **rest):
        return super(RequestContextTask, self).apply_async(args, kwargs, **rest)

    def apply(self, args=None, kwargs=None, **rest):
        return super(RequestContextTask, self).apply(args, kwargs, **rest)

    def retry(self, args=None, kwargs=None, **rest):
        return super(RequestContextTask, self).retry(args, kwargs, **rest)

    def _include_request_context(self, kwargs):
        """Includes all the information about current Flask request context
        as an additional argument to the task.
        if not has_request_context():

        # keys correspond to arguments of :meth:`Flask.test_request_context`
        context = {
            'path': request.path,
            'base_url': request.url_root,
            'method': request.method,
            'headers': dict(request.headers),
        if '?' in request.url:
            context['query_string'] = request.url[(request.url.find('?') + 1):]

        kwargs[self.CONTEXT_ARG_NAME] = context

    def _include_global(self, kwargs):
        d = {}
        for z in self.GLOBAL_KEYS:
            if hasattr(g, z):
                d[z] = getattr(g, z)
        kwargs[self.GLOBALS_ARG_NAME] = d

To run a task in Request context mode, do –

@celery.task(base=RequestContextTask, bind=True)
def background_task(self, *args, **kwargs):
    # do stuff
    # more stuff

If you are wondering what the RequestContextTask class does, it simply stores all request context vars and global vars when a background task is called (task.delay()) and then unpacks those values to their proper places when the task is about to be run. The above snippet can be easily extended to store any value.

Another challenge that some people may face is the occasional Parsing/Serialization error. This happens because the data being sent to/from a function that is to be background executed is too complex.

Serialization is the process of converting complex data structures and objects into a plain string. Serialization of data is necessary because the background tasks and the main thread run in different processes. Now think how will the main thread communicate the celery thread to do some task. This is done using serialization of the concerned data. So to avoid serialization errors, it is recommended that you make background tasks such that they require only simple arguments to run and they return only simple data.

So basically keeping small and simple tasks is recommended when using Celery. Follow this golden rule and you will not run into any problems.


{{ Repost from my personal blog | }}

Shell hacks

Custom Shell

Working with database models needs a lot of use of the flask shell. You can access it with:

python shell

The default shell is quite unintuitive. It doesn’t pretty print the outputs and has no support for auto-completion. I’ve installed IPython that takes care of that. But still working with models means writing a lot of import statements. Plus if there was some change in the code related to the app, then the shell had to restarted again so the changes could be loaded. Meaning writing the import statements again.

We were using Flask-Script and I wanted to run a custom shell that imports all the required modules, models and helper functions, so I don’t have to write them over and over. Some of them were as long as:

from open_event.models.users_events_roles import UsersEventsRoles

So I created a custom shell command with different context that overrides the default shell provided by the Flask-Script Manager. It was pretty easy with Flask-Script. One thing I had to keep in mind is that it needed to be in a different file than Since was committed to source repo, changes to it would be tracked. So I needed a different file that could be excluded from the source repo. I created an that imported the Manager from the open_event module and overrides the shell command.

from flask_script import Shell

from open_event import manager
from open_event.models.user import User
from open_event.models.event import Event
from import save_to_db, delete_from_db

def _make_context():
    return dict(

if __name__ == "__main__":
    manager.add_command('shell', Shell(make_context=_make_context))

Place this file in the same directory as, so you can access it with python shell.

The code is pretty simple to understand. We import the Shell class from flask_script, create its object with our context and then add it to the manager as a command. _make_context contains what I usually like to have in my shell. It must always return a dictionary. The keys of this dictionary would be available as statements inside the shell with their values specified here.

This helps a lot. Most of the time I would be working with the super_admin user, and I would need its User object from time to time. The super_admin user is always going to be the first user (User object with id 1). So instead of from open_event.models.user import User; su = User.query.get(1) I could just use the su variable. Models like User and Event are also readily available (so are their base queries). This is the default context that I always keep, but many times you need more models than the ones specified here. Like when I was working with the permissions system.

from open_event.models.users_events_roles import UsersEventsRoles
from open_event.models.service import Service
from open_event.models.role import Role

def _make_context():
    return dict(
        # usual stuff

You can even write a script that fetches all the database models (instance of sqlalchemy Model class) and then add them to the _make_context dictionary. I like to keep it minimum and differently named, so there are no conflicts of classes when try to auto-complete.

One more thing, you need to exclude the file so that git doesn’t track it. You can simply add it in the .git/info/exclude file.

I also wrote some useful one-liner bash commands.

Revision History

Every time someone updates the database models, he needs to migrate and provide the migrations file to others by committing it to the source. These files help us upgrade our already defined database. We work with Alembic. Regarding alembic revisions (migration files) you can keep two things in mind. One is the Head, that keeps track of the latest revision, and another is Current, that specifies what revision your tables in the database are based on. If your current revision is not a head, it means your database tables are not up-to-date. You need to upgrade (python db upgrade). The can be multiple heads in the revision history. python heads displays all of them. The current revision can be fetched with python current. I wanted something that automatically checks if current is at a head.

python db history | grep --color "$(python db current 2> /dev/null)|$(python db heads)|$"

You can specify it as an alias in .bashrc. This command displays the revision history with the head and current revision being colored.

Screenshot from 2016-07-11 16:34:18

You can identify the head as it is always appended by “(head)”. The other colored revision would be the current.

You can also reverse the output, by printing bottom to up. This is helpful when the revision history gets large and you have to scroll up to see the head.

alias rev='python db history | tac | grep --color "$(python db current 2> /dev/null)|$(python db heads)|$"'

Screenshot from 2016-07-11 16:42:30.png

Current Revision changer

The current revision is maintained at the database in the one-column alembic_version table. If for some reason the migration file for current revision no longer exists (like when changing to a branch at git that doesn’t have the file), alembic would raise an error. So to change the revision at the database I wrote a bash function.

change_db_current () {
    echo "Current Revision: "
    read current_rev
    echo "New Revision: "
    read new_rev

    echo 'c test \ update alembic_version set version_num='"'"$new_rev"'"' where version_num='"'"$current_rev"'"';' | sudo su - postgres -c psql

Enter the current revision hash (displayed in the alembic error) and the revision you need to point the current to, and it will update the table with the new revision. Sick right! It’s pretty stupid actually. It was when I didn’t know how to stamp in alembic. Anyways, it was useful some time ago.

Implementing Admin Trash in Open Event

So last week I had the task of implementing a trash system for the Admin. It was observed that sometimes a user may delete an item and then realize that the item needs to be restores. Thus a trash system works well in this case. Presently the items that are being moved to the trash are:

  • Deleted Users
  • Deleted Events
  • Deleted Sessions

So it works like this. I added a column in_trash to the tables User, Event and Sessions to mark whether the item is in the trash or not

in_trash = db.Column(db.Boolean, default=False)

So depending on whether the value is True or False the item will be in the trash of the admin. Thus for a normal user on deleting an event, user or session a message would flash that the item is deleted and the item would not be shown in the table list of the user. However it would not be deleted from the database.



Thus for the user the item is deleted. The item’s in_trash property is set to True and it gets moved to the trash. The items are displayed in the “Deleted Items” section of the Admin panel


The items deleted are displayed in the trash and as soon as they deleted in the trash they are deleted from the database permanently. A message will flash for the Admin when it is deleted



Thus the trash is implemented. 🙂

Two more things are left:

  • To restore items from trash
  • To automatically delete the items in trash after an inactivity of 30 days

This will soon be implemented 🙂

File Uploading in Flask

Last week I took up an issue of adding upload functionality to the open-event server. I had to implement the upload in 3 places – one in the sponsor table to upload images, another in the user profile page and the third is to upload slides in the session form. However the basic function behind it remains the same. File upload makes use of the inbuilt werkzeug FileStorage class. A code snippet will help understand how it works:


So  on selecting the file it gets stored in the request.files object. It is temporarily stored in the FileStorage in werkzeug. Now we access the file’s name using request.files[‘files’] and then using the inbuilt save()  function in flask it gets saved to the folder specified by us. There are some frameworks available for file uploading in Flask but all this can be done using the standard libraries also and there is no such need of the frameworks.

However instead of storing just the direct name of the file we make use of the secure_filename of werkzeug to save it.Capture2.PNG

Thus the secure_filename stores the file in the format given in the image and makes uploading easier by converting the long url/image path to an easier one.

Challenges with Migrations

So I am sitting in front of my laptop one day working on some issue. I remember it to be adding a database table. Since this is the first time I am actually creating a table I didn’t know how much this issue is going to haunt me! So I created a table SessionType just like other previously created tables and I make a PR happy to solve an issue. But just when I thought everything was going smooth this error comes up…

“Table session_type does not exist”

I understand that you have to again create the database so I run python But still I get the error. I search about it and find that every time I make changes to the database tables I have to update and generate a new migration script. Thus I run python db migrate and I get THE error:

“Cant locate revision identified by *some random revision number* ”

I literally went mad trying to solve this issue. Moreover whenever I would switch branches and the database tables were different I would get this error. Finally I found out a way to solve all the migration related problems (most of them :p). I added a script to drop the entire database.Since right now we are in the development stage it doesnt matter if the data in the database is deleted. So you run :

  1. python
  2. python
  3. Use super_admin password to create the database
  4. But No! Migration is a little mean bastard!It wont leave you like that. You have to make the current revision and the head equal. Thus if your current is referring to some other revision ID then stamp the database table with the head.
  5. Run python db stamp head
  6. Now run python db migrate


Finally! The issue is solved. 🙂



Wrapping up our first steps – Event Server, Material Design, Daily Scrum

The Event Organizer application has already the basic features and we can move to apply more advanced feature. But let explain me, what me and my friends have done recently.

Our application is already able to manage conferences and events. An owner can edit and change events in the way he/she wants to. And we have two version of this app for websites and for mobile phones(Android). The orga serv which I prepared share Json API to both Android and Web app. I guess it is really comfortable solution because it enables to share date between web and mobile app. Our app’s template style is based on material bootstrap, the same is used by Arnav in his application. It is very flat design.

Zrzut ekranu 2015-07-06 o 22.58.07
First Version of Open EVent Menu Bar

What I really like during this term is daily scrum, where we can share what we have already done, what are we going to do next, and what were the obstacles. Because of it, we can easily be in touch and avoid duplicating our work. We can also discuss and quickly choose the most useful solution. Duke and Mario accompany us and as always were ready to help with any trouble.

Arnav and Manan also organized a conference on their university. Many students were invited and discussed about taking part in open source projects. I regret not to having taken part in it, but India is so far from my country, that I could not get there.

I hope that the starting up part of this project will be also so developing and exciting, and we will finish it with a huge success. And all of us will be very proud, learn many new things, and improve our experience.

In the nearest future Arnav, Duke and me are going to create three environments: staging, production, and development. It helps us to organize our work. I am sure that we manage to do it.

Ok, so stay tuned. “Show must go on”! We don’t stop working! 😉

How to create simply posts website in Flask framework?

Zrzut ekranu 2015-06-28 o 00.26.17

I have used Django framework so far, but because of my participation in Google summer of code. We decided to use something new. It was Flask. And Now I want to share it with you.

It is a microframework, which allows us in really short time create a nice app. I’d like to share my experience in this area to teach you how you can create an app. You will have a possibility to add, delete and update posts.

First of all you need to install flask framework via pip in your command line:

pip install flask

Next step it will be prepare a files structures
Zrzut ekranu 2015-06-28 o 00.44.54

My structure of files looks:

  • forms( will contain all form objects in our case only post form)
  • templates will contain all app views(listing posts and possibility to create post)

We want to manage posts, so we have to define database model Post in I only defined title and text fields. Zrzut ekranu 2015-06-28 o 00.49.04

Zrzut ekranu 2015-06-28 o 01.11.58

Next step will be create a routes. The main purpose of routes is to recognize urls and execute actions. We need four methods to display list of posts(index), delete post(delete_post), add post(new_post) and update post(edit_post)

That’s all to run your first posts application. Additionally I attach link to project on github, where you can trace whole project code.

Have a nice coding!