DetachedInstanceError: dealing with Celery, Flask’s app context and SQLAlchemy

In the open event server project, we had chosen to go with celery for async background tasks. From the official website,

What is celery?

Celery is an asynchronous task queue/job queue based on distributed message passing.

What are tasks?

The execution units, called tasks, are executed concurrently on a single or more worker servers using multiprocessing.

After the tasks had been set up, an error constantly came up whenever a task was called

The error was:

DetachedInstanceError: Instance <User at 0x7f358a4e9550> is not bound to a Session; attribute refresh operation cannot proceed

The above error usually occurs when you try to access the session object after it has been closed. It may have been closed by an explicit session.close() call or after committing the session with session.commit().

The celery tasks in question were performing some database operations. So the first thought was that maybe these operations might be causing the error. To test this theory, the celery task was changed to :

@celery.task(name='lorem.ipsum')
def lorem_ipsum():
    pass

But sadly, the error still remained. This proves that the celery task was just fine and the session was being closed whenever the celery task was called. The method in which the celery task was being called was of the following form:

def restore_session(session_id):
    session = DataGetter.get_session(session_id)
    session.deleted_at = None
    lorem_ipsum.delay()
    save_to_db(session, "Session restored from Trash")
    update_version(session.event_id, False, 'sessions_ver')


In our app, the app_context was not being passed whenever a celery task was initiated. Thus, the celery task, whenever called, closed the previous app_context eventually closing the session along with it. The solution to this error would be to follow the pattern as suggested on http://flask.pocoo.org/docs/0.12/patterns/celery/.

def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    task_base = celery.Task

    class ContextTask(task_base):
        abstract = True

        def __call__(self, *args, **kwargs):
            if current_app.config['TESTING']:
                with app.test_request_context():
                    return task_base.__call__(self, *args, **kwargs)
            with app.app_context():
                return task_base.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery

celery = make_celery(current_app)


The __call__ method ensures that celery task is provided with proper app context to work with.

 

Event-driven programming in Flask with Blinker signals

Setting up blinker:

The Open Event Project offers event managers a platform to organize all kinds of events including concerts, conferences, summits and regular meetups. In the server part of the project, the issue at hand was to perform multiple tasks in background (we use celery for this) whenever some changes occurred within the event, or the speakers/sessions associated with the event.

The usual approach to this would be applying a function call after any relevant changes are made. But the statements making these changes were distributed all over the project at multiple places. It would be cumbersome to add 3-4 function calls (which are irrelevant to the function they are being executed) in so may places. Moreover, the code would get unstructured with this and it would be really hard to maintain this code over time.

That’s when signals came to our rescue. From Flask 0.6, there is integrated support for signalling in Flask, refer http://flask.pocoo.org/docs/latest/signals/ . The Blinker library is used here to implement signals. If you’re coming from some other language, signals are analogous to events.

Given below is the code to create named signals in a custom namespace:


from blinker import Namespace

event_signals = Namespace()
speakers_modified = event_signals.signal('event_json_modified')

If you want to emit a signal, you can do so by calling the send() method:


speakers_modified.send(current_app._get_current_object(), event_id=event.id, speaker_id=speaker.id)

From the user guide itself:

“ Try to always pick a good sender. If you have a class that is emitting a signal, pass self as sender. If you are emitting a signal from a random function, you can pass current_app._get_current_object() as sender. “

To subscribe to a signal, blinker provides neat decorator based signal subscriptions.


@speakers_modified.connect
def name_of_signal_handler(app, **kwargs):

 

Some Design Decisions:

When sending the signal, the signal may be sending lots of information, which your signal may or may not want. e.g when you have multiple subscribers listening to the same signal. Some of the information sent by the signal may not be of use to your specific function. Thus we decided to enforce the pattern below to ensure flexibility throughout the project.


@speakers_modified.connect
def new_handler(app, **kwargs):
# do whatever you want to do with kwargs['event_id']

In this case, the function new_handler needs to perform some task solely based on the event_id. If the function was of the form def new_handler(app, event_id), an error would be raised by the app. A big plus of this approach, if you want to send some more info with the signal, for the sake of example, if you also want to send speaker_name along with the signal, this pattern ensures that no error is raised by any of the subscribers defined before this change was made.

When to use signals and when not ?

The call to send a signal will of course be lying in another function itself. The signal and the function should be independent of each other. If the task done by any of the signal subscribers, even remotely affects your current function, a signal shouldn’t be used, use a function call instead.

How to turn off signals while testing?

When in testing mode, signals may slow down your testing as unnecessary signals subscribers which are completely independent from the function being tested will be executed numerous times. To turn off executing the signal subscribers, you have to make a small change in the send function of the blinker library.

Below is what we have done. The approach to turn it off may differ from project to project as the method of testing differs. Refer https://github.com/jek/blinker/blob/master/blinker/base.py#L241 for the original function.


def new_send(self, *sender, **kwargs):
    if len(sender) == 0:
        sender = None
    elif len(sender) > 1:
        raise TypeError('send() accepts only one positional argument, '
                        '%s given' % len(sender))
    else:
        sender = sender[0]
    # only this line was changed
    if not self.receivers or app.config['TESTING']:
        return []
    else:
        return [(receiver, receiver(sender, **kwargs))
                for receiver in self.receivers_for(sender)]
                
Signal.send = new_send

event_signals = Namespace
# and so on ....

That’s all for now. Have some fun signaling 😉 .

 

Set proper content type when uploading files on s3 with python-magic

In the open-event-orga-server project, we had been using Amazon s3 storage for a long time now. After some time we encountered an issue that no matter what the file type was, the Content-Type when retrieving this files from the storage solution was application/octet-stream.

An example response when retrieving an image from s3 was as follows:


Accept-Ranges →bytes
Content-Disposition →attachment; filename=HansBakker_111.jpg
Content-Length →56060
Content-Type →application/octet-stream
Date →Fri, 09 Sep 2016 10:51:06 GMT
ETag →"964b1d839a9261fb0b159e960ceb4cf9"
Last-Modified →Tue, 06 Sep 2016 05:06:23 GMT
Server →AmazonS3
x-amz-id-2 →1GnO0Ta1e+qUE96Qgjm5ZyfyuhMetjc7vfX8UWEsE4fkZRBAuGx9gQwozidTroDVO/SU3BusCZs=
x-amz-request-id →ACF274542E950116

 

As seen above instead of providing image/jpeg as the Content-Type, it provides the Content-Type as application/octet-stream.While uploading the files, we were not providing the content type explicitly, which seemed to be the root of the problem.

It was decided that we would be providing the content type explicitly, so it was time to choose an efficient library to determine the file type based on the content of the file and not the file extension. After researching through the available libraries python-magic seemed to be the obvious choice. python-magic is a python interface to the libmagic file type identification library. libmagic identifies file types by checking their headers according to a predefined list of file types.

Here is an example straight from python-magic‘s readme on its usage:


>>> import magic
>>> magic.from_file("testdata/test.pdf")
'PDF document, version 1.2'
>>> magic.from_buffer(open("testdata/test.pdf").read(1024))
'PDF document, version 1.2'
>>> magic.from_file("testdata/test.pdf", mime=True)
'application/pdf'

 

Given below is a code snippet for the s3 upload function in the project:


file_data = file.read()
    file_mime = magic.from_buffer(file_data, mime=True)
    size = len(file_data)
    # k is defined as  k = Key(bucket) in previous code
    sent = k.set_contents_from_string(
        file_data,
        headers={
            'Content-Disposition': 'attachment; filename=%s' % filename,
            'Content-Type': '%s' % file_mime
        }
    ) 

 

One thing to note that as python-magic uses libmagic-dev as a dependency and many of the distros do not come with libmagic-dev pre-installed, make sure you install libmagic-dev explicitly. (Installation instructions may vary per distro)


sudo apt-get install libmagic-dev

Voila !! Now when retrieving each and every file you’ll get the proper content type.