Create Event by Importing JSON files in Open Event Server

Apart from the usual way of creating an event in  FOSSASIA’s Orga Server project by using POST requests in Events API, another way of creating events is importing a zip file which is an archive of multiple JSON files. This way you can create a large event like FOSSASIA with lots of data related to sessions, speakers, microlocations, sponsors just by uploading JSON files to the system. Sample JSON file can be found in the open-event project of FOSSASIA. The basic workflow of importing an event and how it works is as follows:

  • First step is similar to uploading files to the server. We need to send a POST request with a multipart form data with the zipped archive containing the JSON files.
  • The POST request starts a celery task to start importing data from JSON files and storing them in the database.
  • The celery task URL is returned as a response to the POST request. You can use this celery task for polling purposes to get the status. If the status is FAILURE, we get the error text along with it. If status is SUCCESS we get the resulting event data
  • In the celery task, each JSON file is read separately and the data is stored in the db with the proper relations.
  • Sending a GET request to the above mentioned celery task, after the task has been completed returns the event id along with the event URL.

Let’s see how each of these points work in the background.

Uploading ZIP containing JSON Files

For uploading a zip archive instead of sending a JSON data in the POST request we send a multipart form data. The multipart/form-data format of sending data allows an entire file to be sent as a data in the POST request along with the relevant file informations. One can know about various form content types here .

An example cURL request looks something like this:

curl -H "Authorization: JWT <access token>" -X POST -F 'file=@event1.zip' http://localhost:5000/v1/events/import/json

The above cURL request uploads a file event1.zip from your current directory with the key as ‘file’ to the endpoint /v1/events/import/json. The user uploading the feels needs to have a JWT authentication key or in other words be logged in to the system as it is necessary to create an event.

@import_routes.route('/events/import/<string:source_type>', methods=['POST'])
@jwt_required()
def import_event(source_type):
    if source_type == 'json':
        file_path = get_file_from_request(['zip'])
    else:
        file_path = None
        abort(404)
    from helpers.tasks import import_event_task
    task = import_event_task.delay(email=current_identity.email, file=file_path,
                                   source_type=source_type, creator_id=current_identity.id)
    # create import job
    create_import_job(task.id)

    # if testing
    if current_app.config.get('CELERY_ALWAYS_EAGER'):
        TASK_RESULTS[task.id] = {
            'result': task.get(),
            'state': task.state
        }
    return jsonify(
        task_url=url_for('tasks.celery_task', task_id=task.id)
    )


After the request is received we check if a file exists in the key ‘file’ of the form-data. If it is there, we save the file and get the path to the saved file. Then we send this path over to the celery task and run the task with the
.delay() function of celery. After the celery task is started, the corresponding data about the import job is stored in the database for future debugging and logging purposes. After this we return the task url for the celery task that we started.

Celery Task to Import Data

Just like exporting of event, importing is also a time consuming task and we don’t want other application requests to be paused because of this task. Hence, we use a celery queue to execute this task. Whenever an import task is started, it is added to the celery queue. When it comes to the front of the queue it is executed.

For importing, we have created a celery task, import.event which calls the import_event_task_base() function that uses the import helper functions to get the data from JSON files imported and saved in the DB. After the task is completed, we update the import job data in the table with the status as either SUCCESS or FAILURE depending on the outcome of the celery task.

As a result of the celery task, the newly created event’s id and the frontend link from where we can visit the url is returned. This along with the status of the celery task is returned as the response for a GET request on the celery task. If the celery task fails, then the state is changed to FAILURE and the error which the celery faced is returned as the error message in the result key. We also print an error traceback in the celery worker.

@celery.task(base=RequestContextTask, name='import.event', bind=True, throws=(BaseError,))
def import_event_task(self, file, source_type, creator_id):
    """Import Event Task"""
    task_id = self.request.id.__str__()  # str(async result)
    try:
        result = import_event_task_base(self, file, source_type, creator_id)
        update_import_job(task_id, result['id'], 'SUCCESS')
        # return item
    except BaseError as e:
        print(traceback.format_exc())
        update_import_job(task_id, e.message, e.status if hasattr(e, 'status') else 'failure')
        result = {'__error': True, 'result': e.to_dict()}
    except Exception as e:
        print(traceback.format_exc())
        update_import_job(task_id, e.message, e.status if hasattr(e, 'status') else 'failure')
        result = {'__error': True, 'result': ServerError().to_dict()}
    # send email
    send_import_mail(task_id, result)
    # return result
    return result

 

Save Data from JSON

In import helpers, we have the functions which perform the main task of reading the JSON files, creating sqlalchemy model objects from them and saving them in the database. There are few global dictionaries which help maintain the order in which the files are to be imported and saved and also the file vs model mapping. The first JSON file to be imported is the event JSON file. Since all the other tables to be imported are related to the event table so first we read the event JSON file. After that the order in which the files are read is as follows:

  1. SocialLink
  2. CustomForms
  3. Microlocation
  4. Sponsor
  5. Speaker
  6. Track
  7. SessionType
  8. Session

This order helps maintain the foreign constraints. For importing data from these files we use the function create_service_from_json(). It sorts the elements in the data list  based on the key “id”. It then loops over all the elements or dictionaries contained in the data list. In each iteration delete the unnecessary key-value pairs from the dictionary. Then set the event_id for that element to the id of the newly created event from import instead of the old id present in the data.  After all this is done, create a model object based on the mapping with the filename with the dict data. Then save that model data into the database.

def create_service_from_json(task_handle, data, srv, event_id, service_ids=None):
    """
    Given :data as json, create the service on server
    :service_ids are the mapping of ids of already created services.
        Used for mapping old ids to new
    """
    if service_ids is None:
        service_ids = {}
    global CUR_ID
    # sort by id
    data.sort(key=lambda k: k['id'])
    ids = {}
    ct = 0
    total = len(data)
    # start creating
    for obj in data:
        # update status
        ct += 1
        update_state(task_handle, 'Importing %s (%d/%d)' % (srv[0], ct, total))
        # trim id field
        old_id, obj = _trim_id(obj)
        CUR_ID = old_id
        # delete not needed fields
        obj = _delete_fields(srv, obj)
        # related
        obj = _fix_related_fields(srv, obj, service_ids)
        obj['event_id'] = event_id
        # create object
        new_obj = srv[1](**obj)
        db.session.add(new_obj)
        db.session.commit()
        ids[old_id] = new_obj.id
        # add uploads to queue
        _upload_media_queue(srv, new_obj)

    return ids


After the data has been saved, the next thing to do is upload all the media files to the server. This we do using the
_upload_media_queue()  function. It takes paths to upload the files to from the storage.py helper file for APIs. Then it uploads the files using the various helper functions to the static data storage services like AWS S3, Google storage, etc.

Other than this, the import helpers also contains the function to create an import job that keeps a record of all the imports along with the task url and the user id of the user who started the importing task. It also stores the status of the task. Then there is the get_file_from_request()  function which saves the file that is uploaded through the POST request and returns the path to that file.

Get Response about Event Imported

The POST request returns a task url of the form /v1/tasks/ebe07632-392b-4ae9-8501-87ac27258ce5. To get the final result, you need to keep polling this URL. To know more about polling read my previous blog about exporting event or visit this link. So when the task is completed you would get a “result” key along with the status. The state can either be SUCCESS or FAILURE. If it is a FAILURE you will get a corresponding error message due to which the celery task failed. If it is a success then you get data related to the corresponding event that was created because of import. The data returned are the event id, event name and the event url which you can use to visit the event from the frontend. This data is also sent to the user as an email and notification.

An example response looks something like this:

{ 
    “result”: {
“event_name” : “FOSSASIA 2016”,
     “id” : “24”,
     “url” : “https://eventyay.com/events/ab3de6
},
    “state” : “SUCCESS”
}

The corresponding event name and the url is also sent to the user who started the import task. From the frontend, one can use the object value of the result to show the name of the event that is imported along with providing the event url. Since the id and identifier are both present in the result returned one can also make use of them to send GET, PATCH and other API requests to the events/ endpoint and get the corresponding relationship urls from it to query the other APIs. Thus, the entire data that is imported gets available to the frontend as well.

 

Reference Links:

 

Export an Event using APIs of Open Event Server

We in FOSSASIA’s Open Event Server project, allow the organizer, co-organizer and the admins to export all the data related to an event in the form of an archive of JSON files. This way the data can be reused in some other place for various different purposes. The basic workflow is something like this:

  • Send a POST request in the /events/{event_id}/export/json with a payload containing whether you require the various media files.
  • The POST request starts a celery task in the background to start extracting data related to event and jsonifying them
  • The celery task url is returned as a response. Sending a GET request to this url gives the status of the task. If the status is either FAILED or SUCCESS then there is the corresponding error message or the result.
  • Separate JSON files for events, speakers, sessions, micro-locations, tracks, session types and custom forms are created.
  • All this files are then archived and the zip is then served on the endpoint /events/{event_id}/exports/{path}
  • Sending a GET request to the above mentioned endpoint downloads a zip containing all the data related to the endpoint.

Let’s dive into each of these points one-by-one

POST request ( /events/{event_id}/export/json)

For making a POST request you firstly need a JWT authentication like most of the other API endpoints. You need to send a payload containing the settings for whether you want the media files related with the event to be downloaded along with the JSON files. An example payload looks like this:

{
   "image": true,
   "video": true,
   "document": true,
   "audio": true
 }

def export_event(event_id):
    from helpers.tasks import export_event_task

    settings = EXPORT_SETTING
    settings['image'] = request.json.get('image', False)
    settings['video'] = request.json.get('video', False)
    settings['document'] = request.json.get('document', False)
    settings['audio'] = request.json.get('audio', False)
    # queue task
    task = export_event_task.delay(
        current_identity.email, event_id, settings)
    # create Job
    create_export_job(task.id, event_id)

    # in case of testing
    if current_app.config.get('CELERY_ALWAYS_EAGER'):
        # send_export_mail(event_id, task.get())
        TASK_RESULTS[task.id] = {
            'result': task.get(),
            'state': task.state
        }
    return jsonify(
        task_url=url_for('tasks.celery_task', task_id=task.id)
    )


Taking the settings about the media files and the event id, we pass them as parameter to the export event celery task and queue up the task. We then create an entry in the database with the task url and the event id and the user who triggered the export to keep a record of the activity. After that we return as response the url for the celery task to the user.

If the celery task is still underway it show a response with ‘state’:’WAITING’. Once, the task is completed, the value of ‘state’ is either ‘FAILED’ or ‘SUCCESS’. If it is SUCCESS it returns the result of the task, in this case the download url for the zip.

Celery Task to Export Event

Exporting an event is a very time consuming process and we don’t want that this process to come in the way of user interaction with other services. So we needed to use a queueing system that would queue the tasks and execute them in the background with disturbing the main worker from executing the other user requests. We have used celery to queue tasks in the background and execute them without disturbing the other user requests.

We have created a celery task namely “export.event” which calls the event_export_task_base() which in turn calls the export_event_json() where all the jsonification process is carried out. To start the celery task all we do is export_event_task.delay(event_id, settings) and it return a celery task object with a task id that can be used to check the status of the task.

@celery.task(base=RequestContextTask, name='export.event', bind=True)
def export_event_task(self, email, event_id, settings):
    event = safe_query(db, Event, 'id', event_id, 'event_id')
    try:
        logging.info('Exporting started')
        path = event_export_task_base(event_id, settings)
        # task_id = self.request.id.__str__()  # str(async result)
        download_url = path

        result = {
            'download_url': download_url
        }
        logging.info('Exporting done.. sending email')
        send_export_mail(email=email, event_name=event.name, download_url=download_url)
    except Exception as e:
        print(traceback.format_exc())
        result = {'__error': True, 'result': str(e)}
        logging.info('Error in exporting.. sending email')
        send_export_mail(email=email, event_name=event.name, error_text=str(e))

    return result


After exporting a path to the export zip is returned. We then get the downloading endpoint and return it as the result of the celery task. In case there is an error in the celery task, we print an entire traceback in the celery worker and return the error as a result.

Make the Exported Zip Ready

We have a separate export_helpers.py file in the helpers module of API for performing various tasks related to exporting all the data of the event. The most important function in this file is the export_event_json(). This file accepts the event_id and the settings dictionary. In the export helpers we have global constant dictionaries which contain the order in which the fields are to appear in the JSON files created while exporting.

Firstly, we create the directory for storing the exported JSON and finally the archive of all the JSON files. Then we have a global dictionary named EXPORTS which contains all the tables and their corresponding Models which we want to extract from the database and store as JSON.  From the EXPORTS dict we get the Model names. We use this Models to make queries with the given event_id and retrieve the data from the database. After retrieving data, we use another helper function named _order_json which jsonifies the sqlalchemy data in the order that is mentioned in the dictionary. After this we download the media data, i.e. the slides, images, videos etc. related to that particular Model depending on the settings.

def export_event_json(event_id, settings):
    """
    Exports the event as a zip on the server and return its path
    """
    # make directory
    exports_dir = app.config['BASE_DIR'] + '/static/uploads/exports/'
    if not os.path.isdir(exports_dir):
        os.mkdir(exports_dir)
    dir_path = exports_dir + 'event%d' % int(event_id)
    if os.path.isdir(dir_path):
        shutil.rmtree(dir_path, ignore_errors=True)
    os.mkdir(dir_path)
    # save to directory
    for e in EXPORTS:
        if e[0] == 'event':
            query_obj = db.session.query(e[1]).filter(
                e[1].id == event_id).first()
            data = _order_json(dict(query_obj.__dict__), e)
            _download_media(data, 'event', dir_path, settings)
        else:
            query_objs = db.session.query(e[1]).filter(
                e[1].event_id == event_id).all()
            data = [_order_json(dict(query_obj.__dict__), e) for query_obj in query_objs]
            for count in range(len(data)):
                data[count] = _order_json(data[count], e)
                _download_media(data[count], e[0], dir_path, settings)
        data_str = json.dumps(data, indent=4, ensure_ascii=False).encode('utf-8')
        fp = open(dir_path + '/' + e[0], 'w')
        fp.write(data_str)
        fp.close()
    # add meta
    data_str = json.dumps(
        _generate_meta(), sort_keys=True,
        indent=4, ensure_ascii=False
    ).encode('utf-8')
    fp = open(dir_path + '/meta', 'w')
    fp.write(data_str)
    fp.close()
    # make zip
    shutil.make_archive(dir_path, 'zip', dir_path)
    dir_path = dir_path + ".zip"

    storage_path = UPLOAD_PATHS['exports']['zip'].format(
        event_id=event_id
    )
    uploaded_file = UploadedFile(dir_path, dir_path.rsplit('/', 1)[1])
    storage_url = upload(uploaded_file, storage_path)

    return storage_url


After we receive the json data from the _order_json() function, we create a dump of the json using json.dumps with an indentation of 4 spaces and utf-8 encoding. Then we save this dump in a file named according to the model from which the data was retrieved. This process is repeated for all the models that are mentioned in the EXPORTS dictionary. After all the JSON files are created and all the media is downloaded, we make a zip of the folder.

To do this we use shutil.make_archive. It creates a zip and uploads the zip to the storage service used by the server such as S3, google storage, etc. and returns the url for the zip through which it can be accessed.

Apart from this function, the other major function in this file is to create an export job entry in the database so that we can keep a track about which used started a task related to which event and help us in debugging and security purposes.

Downloading the Zip File

After the exporting is completed, if you send a GET request to the task url, you get a response similar to this:

{
   "result": {
     "download_url": "http://localhost:5000/static/media/exports/1/zip/OGpMM0w2RH/event1.zip"
   },
   "state": "SUCCESS"
 }

So on opening the download url in the browser or using any other tool, you can download the zip file.

One big question however remains is, all the workflow is okay but how do you understand after sending the POST request, that the task is completed and ready to be downloaded? One way of solving this problem is a technique known as polling. In polling what we do is we send a GET request repeatedly after every fixed interval of time. So, what we do is from the POST request we get the url for the export task. You keep polling this task url until the state is either “FAILED” or “SUCCESS”. If it is a SUCCESS you append the download url somewhere in your website which can then clicked to download the archived export of the event.

 

Reference:

 

DetachedInstanceError: Dealing with Celery, Flask’s app context and SQLAlchemy in the Open Event Server

In the open event server project, we had chosen to go with celery for async background tasks. From the official website,

What is celery?

Celery is an asynchronous task queue/job queue based on distributed message passing.

What are tasks?

The execution units, called tasks, are executed concurrently on a single or more worker servers using multiprocessing.

After the tasks had been set up, an error constantly came up whenever a task was called

The error was:

DetachedInstanceError: Instance <User at 0x7f358a4e9550> is not bound to a Session; attribute refresh operation cannot proceed

The above error usually occurs when you try to access the session object after it has been closed. It may have been closed by an explicit session.close() call or after committing the session with session.commit().

The celery tasks in question were performing some database operations. So the first thought was that maybe these operations might be causing the error. To test this theory, the celery task was changed to :

@celery.task(name='lorem.ipsum')
def lorem_ipsum():
    pass

But sadly, the error still remained. This proves that the celery task was just fine and the session was being closed whenever the celery task was called. The method in which the celery task was being called was of the following form:

def restore_session(session_id):
    session = DataGetter.get_session(session_id)
    session.deleted_at = None
    lorem_ipsum.delay()
    save_to_db(session, "Session restored from Trash")
    update_version(session.event_id, False, 'sessions_ver')


In our app, the app_context was not being passed whenever a celery task was initiated. Thus, the celery task, whenever called, closed the previous app_context eventually closing the session along with it. The solution to this error would be to follow the pattern as suggested on http://flask.pocoo.org/docs/0.12/patterns/celery/.

def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    task_base = celery.Task

    class ContextTask(task_base):
        abstract = True

        def __call__(self, *args, **kwargs):
            if current_app.config['TESTING']:
                with app.test_request_context():
                    return task_base.__call__(self, *args, **kwargs)
            with app.app_context():
                return task_base.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery

celery = make_celery(current_app)


The __call__ method ensures that celery task is provided with proper app context to work with.

 

Setting up Celery with Flask

In this article, I will explain how to use Celery with a Flask application. Celery requires a broker to run. The most famous of the brokers is Redis. So to start using Celery with Flask, first we will have to setup the Redis broker.

Redis can be downloaded from their site http://redis.io. I wrote a script that simplifies downloading, building and running the redis server.

#!/bin/bash
# This script downloads and runs redis-server.
# If redis has been already downloaded, it just runs it
if [ ! -d redis-3.2.1/src ]; then
    wget http://download.redis.io/releases/redis-3.2.1.tar.gz
    tar xzf redis-3.2.1.tar.gz
    rm redis-3.2.1.tar.gz
    cd redis-3.2.1
    make
else
    cd redis-3.2.1
fi
src/redis-server

When the above script is ran from the first time, the redis folder doesn’t exist so it downloads the same, builds it and then runs it. In subsequent runs, it will skip the downloading and building part and just run the server.

Now that the redis server is running, we will have to install its Python counterpart.

pip install redis

After the redis broker is set, now its time to setup the celery extension. First install celery by using pip install celery. Then we need to setup celery in the flask app definition.

# in app.py
def make_celery(app):
	# set redis url vars
	app.config['CELERY_BROKER_URL'] = environ.get('REDIS_URL', 'redis://localhost:6379/0')
    app.config['CELERY_RESULT_BACKEND'] = app.config['CELERY_BROKER_URL']
    # create context tasks in celery
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

celery = make_celery(current_app)

Now that Celery is setup on our project, let’s define a sample task.

@app.route('/task')
def view():
	background_task.delay(*args, **kwargs)
	return 'OK'

@celery.task
def background_task(*args, **kwargs):
	# code
	# more code

Now to run the celery workers, execute

celery worker -A app.celery

That should be all. Now to run our little project, we can execute the following script.

bash run_redis.sh &  # to run redis
celery worker -A app.celery &  # to run celery workers
python app.py

If you are wondering how to run the same on Heroku, just use the free heroku-redis extension. It will start the redis server on heroku. Then to run the workers and app, set the Procfile as –

web: sh heroku.sh

Then set the heroku.sh as –

#!/bin/bash
celery worker -A app.celery &
gunicorn app:app

That’s a basic guide on how to run a Flask app with Celery and Redis. If you want more information on this topic, please see my post Ideas on Using Celery in Flask for background tasks.

Ideas on using Celery with Flask for background tasks

Simply put, Celery is a background task runner. It can run time-intensive tasks in the background so that your application can focus on the stuff that matters the most. In context of a Flask application, the stuff that matters the most is listening to HTTP requests and returning response.

By default, Flask runs on a single-thread. Now if a request is executed that takes several seconds to run, then it will block all other incoming requests as it is single-threaded. This will be a very bad-experience for the user who is using the product. So here we can use Celery to move time-hogging part of that request to the background.

I would like to let you know that by “background”, Celery means another process. Celery starts worker processes for the running application and these workers receive work from the main application. Celery requires a broker to be used. Broker is nothing but a database that stores results of a celery task and provides a shared interface between main process and worker processes. The output of the work done by the workers is stored in the Broker. The main application can then access these results from the Broker.

Using Celery to set background tasks in your application is as simple as follows –

@celery.task
def background_task(*args, **kwargs):
    # do stuff
    # more stuff

Now the function background_task becomes function-able as a background task. To execute it as a background task, run –

task = background_task.delay(*args, **kwargs)
print task.state  # task current state (PENDING, SUCCESS, FAILURE)

Till now this may look nice and easy but it can cause lots of problems. This is because the background tasks run in different processes than the main application. So the state of the worker application differs from the real application.

One common problem because of this is the lack of request context. Since a celery task runs in a different process, so the request context is not available. Therefore the request headers, cookies and everything else is not available when the task actually runs. I too faced this problem and solved it using an excellent snippet I found on the Internet.

"""
Celery task wrapper to set request context vars and global
vars when a task is executed
Based on http://xion.io/post/code/celery-include-flask-request-context.html
"""
from celery import Task
from flask import has_request_context, make_response, request, g

from app import app  # the flask app


__all__ = ['RequestContextTask']


class RequestContextTask(Task):
    """Base class for tasks that originate from Flask request handlers
    and carry over most of the request context data.
    This has an advantage of being able to access all the usual information
    that the HTTP request has and use them within the task. Pontential
    use cases include e.g. formatting URLs for external use in emails sent
    by tasks.
    """
    abstract = True

    #: Name of the additional parameter passed to tasks
    #: that contains information about the original Flask request context.
    CONTEXT_ARG_NAME = '_flask_request_context'
    GLOBALS_ARG_NAME = '_flask_global_proxy'
    GLOBAL_KEYS = ['user']

    def __call__(self, *args, **kwargs):
        """Execute task code with given arguments."""
        call = lambda: super(RequestContextTask, self).__call__(*args, **kwargs)

        # set context
        context = kwargs.pop(self.CONTEXT_ARG_NAME, None)
        gl = kwargs.pop(self.GLOBALS_ARG_NAME, {})

        if context is None or has_request_context():
            return call()

        with app.test_request_context(**context):
            # set globals
            for i in gl:
                setattr(g, i, gl[i])
            # call
            result = call()
            # process a fake "Response" so that
            # ``@after_request`` hooks are executed
            # app.process_response(make_response(result or ''))

        return result

    def apply_async(self, args=None, kwargs=None, **rest):
        self._include_request_context(kwargs)
        self._include_global(kwargs)
        return super(RequestContextTask, self).apply_async(args, kwargs, **rest)

    def apply(self, args=None, kwargs=None, **rest):
        self._include_request_context(kwargs)
        self._include_global(kwargs)
        return super(RequestContextTask, self).apply(args, kwargs, **rest)

    def retry(self, args=None, kwargs=None, **rest):
        self._include_request_context(kwargs)
        self._include_global(kwargs)
        return super(RequestContextTask, self).retry(args, kwargs, **rest)

    def _include_request_context(self, kwargs):
        """Includes all the information about current Flask request context
        as an additional argument to the task.
        """
        if not has_request_context():
            return

        # keys correspond to arguments of :meth:`Flask.test_request_context`
        context = {
            'path': request.path,
            'base_url': request.url_root,
            'method': request.method,
            'headers': dict(request.headers),
        }
        if '?' in request.url:
            context['query_string'] = request.url[(request.url.find('?') + 1):]

        kwargs[self.CONTEXT_ARG_NAME] = context

    def _include_global(self, kwargs):
        d = {}
        for z in self.GLOBAL_KEYS:
            if hasattr(g, z):
                d[z] = getattr(g, z)
        kwargs[self.GLOBALS_ARG_NAME] = d

To run a task in Request context mode, do –

@celery.task(base=RequestContextTask, bind=True)
def background_task(self, *args, **kwargs):
    # do stuff
    # more stuff

If you are wondering what the RequestContextTask class does, it simply stores all request context vars and global vars when a background task is called (task.delay()) and then unpacks those values to their proper places when the task is about to be run. The above snippet can be easily extended to store any value.

Another challenge that some people may face is the occasional Parsing/Serialization error. This happens because the data being sent to/from a function that is to be background executed is too complex.

Serialization is the process of converting complex data structures and objects into a plain string. Serialization of data is necessary because the background tasks and the main thread run in different processes. Now think how will the main thread communicate the celery thread to do some task. This is done using serialization of the concerned data. So to avoid serialization errors, it is recommended that you make background tasks such that they require only simple arguments to run and they return only simple data.

So basically keeping small and simple tasks is recommended when using Celery. Follow this golden rule and you will not run into any problems.

 

{{ Repost from my personal blog | http://aviaryan.in/blog/gsoc/celery-flask-good-ideas.html }}