DetachedInstanceError: dealing with Celery, Flask’s app context and SQLAlchemy

In the open event server project, we had chosen to go with celery for async background tasks. From the official website,

What is celery?

Celery is an asynchronous task queue/job queue based on distributed message passing.

What are tasks?

The execution units, called tasks, are executed concurrently on a single or more worker servers using multiprocessing.

After the tasks had been set up, an error constantly came up whenever a task was called

The error was:

DetachedInstanceError: Instance <User at 0x7f358a4e9550> is not bound to a Session; attribute refresh operation cannot proceed

The above error usually occurs when you try to access the session object after it has been closed. It may have been closed by an explicit session.close() call or after committing the session with session.commit().

The celery tasks in question were performing some database operations. So the first thought was that maybe these operations might be causing the error. To test this theory, the celery task was changed to :

def lorem_ipsum():

But sadly, the error still remained. This proves that the celery task was just fine and the session was being closed whenever the celery task was called. The method in which the celery task was being called was of the following form:

def restore_session(session_id):
    session = DataGetter.get_session(session_id)
    session.deleted_at = None
    save_to_db(session, "Session restored from Trash")
    update_version(session.event_id, False, 'sessions_ver')

In our app, the app_context was not being passed whenever a celery task was initiated. Thus, the celery task, whenever called, closed the previous app_context eventually closing the session along with it. The solution to this error would be to follow the pattern as suggested on

def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    task_base = celery.Task

    class ContextTask(task_base):
        abstract = True

        def __call__(self, *args, **kwargs):
            if current_app.config['TESTING']:
                with app.test_request_context():
                    return task_base.__call__(self, *args, **kwargs)
            with app.app_context():
                return task_base.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery

celery = make_celery(current_app)

The __call__ method ensures that celery task is provided with proper app context to work with.


Setting up Celery with Flask

In this article, I will explain how to use Celery with a Flask application. Celery requires a broker to run. The most famous of the brokers is Redis. So to start using Celery with Flask, first we will have to setup the Redis broker.

Redis can be downloaded from their site I wrote a script that simplifies downloading, building and running the redis server.

# This script downloads and runs redis-server.
# If redis has been already downloaded, it just runs it
if [ ! -d redis-3.2.1/src ]; then
    tar xzf redis-3.2.1.tar.gz
    rm redis-3.2.1.tar.gz
    cd redis-3.2.1
    cd redis-3.2.1

When the above script is ran from the first time, the redis folder doesn’t exist so it downloads the same, builds it and then runs it. In subsequent runs, it will skip the downloading and building part and just run the server.

Now that the redis server is running, we will have to install its Python counterpart.

pip install redis

After the redis broker is set, now its time to setup the celery extension. First install celery by using pip install celery. Then we need to setup celery in the flask app definition.

# in
def make_celery(app):
	# set redis url vars
	app.config['CELERY_BROKER_URL'] = environ.get('REDIS_URL', 'redis://localhost:6379/0')
    app.config['CELERY_RESULT_BACKEND'] = app.config['CELERY_BROKER_URL']
    # create context tasks in celery
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

celery = make_celery(current_app)

Now that Celery is setup on our project, let’s define a sample task.

def view():
	background_task.delay(*args, **kwargs)
	return 'OK'

def background_task(*args, **kwargs):
	# code
	# more code

Now to run the celery workers, execute

celery worker -A app.celery

That should be all. Now to run our little project, we can execute the following script.

bash &  # to run redis
celery worker -A app.celery &  # to run celery workers

If you are wondering how to run the same on Heroku, just use the free heroku-redis extension. It will start the redis server on heroku. Then to run the workers and app, set the Procfile as –

web: sh

Then set the as –

celery worker -A app.celery &
gunicorn app:app

That’s a basic guide on how to run a Flask app with Celery and Redis. If you want more information on this topic, please see my post Ideas on Using Celery in Flask for background tasks.

Ideas on using Celery with Flask for background tasks

Simply put, Celery is a background task runner. It can run time-intensive tasks in the background so that your application can focus on the stuff that matters the most. In context of a Flask application, the stuff that matters the most is listening to HTTP requests and returning response.

By default, Flask runs on a single-thread. Now if a request is executed that takes several seconds to run, then it will block all other incoming requests as it is single-threaded. This will be a very bad-experience for the user who is using the product. So here we can use Celery to move time-hogging part of that request to the background.

I would like to let you know that by “background”, Celery means another process. Celery starts worker processes for the running application and these workers receive work from the main application. Celery requires a broker to be used. Broker is nothing but a database that stores results of a celery task and provides a shared interface between main process and worker processes. The output of the work done by the workers is stored in the Broker. The main application can then access these results from the Broker.

Using Celery to set background tasks in your application is as simple as follows –

def background_task(*args, **kwargs):
    # do stuff
    # more stuff

Now the function background_task becomes function-able as a background task. To execute it as a background task, run –

task = background_task.delay(*args, **kwargs)
print task.state  # task current state (PENDING, SUCCESS, FAILURE)

Till now this may look nice and easy but it can cause lots of problems. This is because the background tasks run in different processes than the main application. So the state of the worker application differs from the real application.

One common problem because of this is the lack of request context. Since a celery task runs in a different process, so the request context is not available. Therefore the request headers, cookies and everything else is not available when the task actually runs. I too faced this problem and solved it using an excellent snippet I found on the Internet.

Celery task wrapper to set request context vars and global
vars when a task is executed
Based on
from celery import Task
from flask import has_request_context, make_response, request, g

from app import app  # the flask app

__all__ = ['RequestContextTask']

class RequestContextTask(Task):
    """Base class for tasks that originate from Flask request handlers
    and carry over most of the request context data.
    This has an advantage of being able to access all the usual information
    that the HTTP request has and use them within the task. Pontential
    use cases include e.g. formatting URLs for external use in emails sent
    by tasks.
    abstract = True

    #: Name of the additional parameter passed to tasks
    #: that contains information about the original Flask request context.
    CONTEXT_ARG_NAME = '_flask_request_context'
    GLOBALS_ARG_NAME = '_flask_global_proxy'
    GLOBAL_KEYS = ['user']

    def __call__(self, *args, **kwargs):
        """Execute task code with given arguments."""
        call = lambda: super(RequestContextTask, self).__call__(*args, **kwargs)

        # set context
        context = kwargs.pop(self.CONTEXT_ARG_NAME, None)
        gl = kwargs.pop(self.GLOBALS_ARG_NAME, {})

        if context is None or has_request_context():
            return call()

        with app.test_request_context(**context):
            # set globals
            for i in gl:
                setattr(g, i, gl[i])
            # call
            result = call()
            # process a fake "Response" so that
            # ``@after_request`` hooks are executed
            # app.process_response(make_response(result or ''))

        return result

    def apply_async(self, args=None, kwargs=None, **rest):
        return super(RequestContextTask, self).apply_async(args, kwargs, **rest)

    def apply(self, args=None, kwargs=None, **rest):
        return super(RequestContextTask, self).apply(args, kwargs, **rest)

    def retry(self, args=None, kwargs=None, **rest):
        return super(RequestContextTask, self).retry(args, kwargs, **rest)

    def _include_request_context(self, kwargs):
        """Includes all the information about current Flask request context
        as an additional argument to the task.
        if not has_request_context():

        # keys correspond to arguments of :meth:`Flask.test_request_context`
        context = {
            'path': request.path,
            'base_url': request.url_root,
            'method': request.method,
            'headers': dict(request.headers),
        if '?' in request.url:
            context['query_string'] = request.url[(request.url.find('?') + 1):]

        kwargs[self.CONTEXT_ARG_NAME] = context

    def _include_global(self, kwargs):
        d = {}
        for z in self.GLOBAL_KEYS:
            if hasattr(g, z):
                d[z] = getattr(g, z)
        kwargs[self.GLOBALS_ARG_NAME] = d

To run a task in Request context mode, do –

@celery.task(base=RequestContextTask, bind=True)
def background_task(self, *args, **kwargs):
    # do stuff
    # more stuff

If you are wondering what the RequestContextTask class does, it simply stores all request context vars and global vars when a background task is called (task.delay()) and then unpacks those values to their proper places when the task is about to be run. The above snippet can be easily extended to store any value.

Another challenge that some people may face is the occasional Parsing/Serialization error. This happens because the data being sent to/from a function that is to be background executed is too complex.

Serialization is the process of converting complex data structures and objects into a plain string. Serialization of data is necessary because the background tasks and the main thread run in different processes. Now think how will the main thread communicate the celery thread to do some task. This is done using serialization of the concerned data. So to avoid serialization errors, it is recommended that you make background tasks such that they require only simple arguments to run and they return only simple data.

So basically keeping small and simple tasks is recommended when using Celery. Follow this golden rule and you will not run into any problems.


{{ Repost from my personal blog | }}