Isolating each request in Dredd testing for Open Event Server

In the Open Event Server, we are using Api-blueprint along with aglio for API documentation in the project. The foremost concern with any API documentation is making sure it remains updated to the actual implementation of the API backend.

It was only a matter of days that we realized we needed some sort of testing mechanism for the API documentation. Dredd came to our rescue in that moment. Quoting the official documentation:

Dredd is a language-agnostic command-line tool for validating API description document against backend implementation of the API. Dredd reads your API description and step by step validates whether your API implementation replies with responses as they are described in the documentation. 

For loading DB fixtures, handling authentication, sessions etc. Dredd Hooks are used. Quoting their official documentation:

Similar to any other testing framework, Dredd supports executing code around each test step. Hooks are code blocks executed in defined stage of execution lifecycle. In the hooks code you have an access to compiled HTTP transaction object which you can modify.

Now after setting up dredd testing successfully, the challenge was to make every request remains isolated with respect to the database i.e making sure that database changes by one request do not affect the database used by another request, similar to what is done in case of unit tests ;).

In unit tests, you can just access the session of the tests after it is completed and gracefully rollback the changes made by the test. But as dredd is completely separated from the API backend being tested, accessing that session is not possible.

So the only choice that remains is purging the database after each request and recreating it with the required data before each request. To do the above, we initialize a vanilla flask app and attach a database to it:

@hooks.before_all
def before_all(transaction):
   app = Flask(__name__)
   app.config.from_object('config.TestingConfig')
   db.init_app(app)
   Migrate(app, db)
   stash['app'] = app
   stash['db'] = db

Before each request, the pre-existing database is purged to remove the changes from the previous request. After that a new db is created with necessary repopulation.

@hooks.before_each
def before_each(transaction):
   with stash['app'].app_context():
       db.engine.execute("drop schema if exists public cascade")
       db.engine.execute("create schema public")
       db.create_all()
       stamp()
       populate_db_with_required_static_values()

Database fixtures specific to the request can be loaded in the before hook in the following way:

@hooks.before("Users > Users Collection > List All Users")
def user_get_list(transaction):
   """
   GET /users
   :param transaction:
   :return:
   """
   with stash['app'].app_context():
       user = UserFactory()
       db.session.add(user)
       db.session.commit()

Relevant links:

Handling soft and hard deletes in the Open Event server API

Really, handling soft and hard deletes can be a mess, if you think of it.

Earlier in the Open Event server project, we had a Boolean field called is_trashed which was set to true if a record was soft-deleted. That worked just fine, until there came a requirement to get the time at which the record was deleted. So duh… we added another column called deleted_at which would store the time at which the record was soft-deleted. And it all started working fine again.

But, shortly we realised it was bad design to have a redundant Boolean field is_trashed. So it was decided to remove the is_trashed field and only keep the deleted_at column at all places. If the deleted_at field contained a date, it would mean that the record has been soft deleted at that point of time. If the field was still NULL, then the record has not been soft deleted. That ends up the database aspect of implementing soft-deletes. Let’s move on to the API part then.

We are currently in the process of decoupling our front-end and back-end. And the API server for the same is in active development. We’ve been using flask-rest-jsonapi for the same purpose. So, the first thing that popped up in our minds, when we got around handling soft-deletes was the following.

Should the API framework implement soft-deletes for each API by itself, or should the individual API logic take care of it ?

After some discussion, it was decided to let the framework handle it for each API, so that the implementation remains uniform and obviously a little less headache for the developers. In our custom copy of flask-rest-jsonapi, we also added an option to turn off the soft deletes across the whole API. Turning it off for each resource is also in our road map and would be soon implemented in the future.

Now talking about the API itself, for GET endpoints by default soft-deleted records should not be retrieved. Retrieving all the records irrespective of whether it is soft-deleted or not and letting client figure out which records are deleted is a sign of bad design. If the client wants to retrieve the deleted records, it can do so by passing a query parameter is_trashed set to true.

Following is the URL pattern followed for the same, for the sake of the example, assume that the event with id 1 is soft-deleted:

GET /events?with_trashed=true   # get all events including the soft-deleted events
GET /events/1                       # send a 404 exception
GET /events/1?with_trashed=true # retrieve relevant data 

For DELETE request:

DELETE /events/1                 # soft-delete the event
DELETE /events/1?permanent=true   # hard-delete the event

Relevant links:

DetachedInstanceError: Dealing with Celery, Flask’s app context and SQLAlchemy in the Open Event Server

In the open event server project, we had chosen to go with celery for async background tasks. From the official website,

What is celery?

Celery is an asynchronous task queue/job queue based on distributed message passing.

What are tasks?

The execution units, called tasks, are executed concurrently on a single or more worker servers using multiprocessing.

After the tasks had been set up, an error constantly came up whenever a task was called

The error was:

DetachedInstanceError: Instance <User at 0x7f358a4e9550> is not bound to a Session; attribute refresh operation cannot proceed

The above error usually occurs when you try to access the session object after it has been closed. It may have been closed by an explicit session.close() call or after committing the session with session.commit().

The celery tasks in question were performing some database operations. So the first thought was that maybe these operations might be causing the error. To test this theory, the celery task was changed to :

@celery.task(name='lorem.ipsum')
def lorem_ipsum():
    pass

But sadly, the error still remained. This proves that the celery task was just fine and the session was being closed whenever the celery task was called. The method in which the celery task was being called was of the following form:

def restore_session(session_id):
    session = DataGetter.get_session(session_id)
    session.deleted_at = None
    lorem_ipsum.delay()
    save_to_db(session, "Session restored from Trash")
    update_version(session.event_id, False, 'sessions_ver')


In our app, the app_context was not being passed whenever a celery task was initiated. Thus, the celery task, whenever called, closed the previous app_context eventually closing the session along with it. The solution to this error would be to follow the pattern as suggested on http://flask.pocoo.org/docs/0.12/patterns/celery/.

def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    task_base = celery.Task

    class ContextTask(task_base):
        abstract = True

        def __call__(self, *args, **kwargs):
            if current_app.config['TESTING']:
                with app.test_request_context():
                    return task_base.__call__(self, *args, **kwargs)
            with app.app_context():
                return task_base.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery

celery = make_celery(current_app)


The __call__ method ensures that celery task is provided with proper app context to work with.

 

Event-driven programming in Flask with Blinker signals

Setting up blinker:

The Open Event Project offers event managers a platform to organize all kinds of events including concerts, conferences, summits and regular meetups. In the server part of the project, the issue at hand was to perform multiple tasks in background (we use celery for this) whenever some changes occurred within the event, or the speakers/sessions associated with the event.

The usual approach to this would be applying a function call after any relevant changes are made. But the statements making these changes were distributed all over the project at multiple places. It would be cumbersome to add 3-4 function calls (which are irrelevant to the function they are being executed) in so may places. Moreover, the code would get unstructured with this and it would be really hard to maintain this code over time.

That’s when signals came to our rescue. From Flask 0.6, there is integrated support for signalling in Flask, refer http://flask.pocoo.org/docs/latest/signals/ . The Blinker library is used here to implement signals. If you’re coming from some other language, signals are analogous to events.

Given below is the code to create named signals in a custom namespace:


from blinker import Namespace

event_signals = Namespace()
speakers_modified = event_signals.signal('event_json_modified')

If you want to emit a signal, you can do so by calling the send() method:


speakers_modified.send(current_app._get_current_object(), event_id=event.id, speaker_id=speaker.id)

From the user guide itself:

“ Try to always pick a good sender. If you have a class that is emitting a signal, pass self as sender. If you are emitting a signal from a random function, you can pass current_app._get_current_object() as sender. “

To subscribe to a signal, blinker provides neat decorator based signal subscriptions.


@speakers_modified.connect
def name_of_signal_handler(app, **kwargs):

 

Some Design Decisions:

When sending the signal, the signal may be sending lots of information, which your signal may or may not want. e.g when you have multiple subscribers listening to the same signal. Some of the information sent by the signal may not be of use to your specific function. Thus we decided to enforce the pattern below to ensure flexibility throughout the project.


@speakers_modified.connect
def new_handler(app, **kwargs):
# do whatever you want to do with kwargs['event_id']

In this case, the function new_handler needs to perform some task solely based on the event_id. If the function was of the form def new_handler(app, event_id), an error would be raised by the app. A big plus of this approach, if you want to send some more info with the signal, for the sake of example, if you also want to send speaker_name along with the signal, this pattern ensures that no error is raised by any of the subscribers defined before this change was made.

When to use signals and when not ?

The call to send a signal will of course be lying in another function itself. The signal and the function should be independent of each other. If the task done by any of the signal subscribers, even remotely affects your current function, a signal shouldn’t be used, use a function call instead.

How to turn off signals while testing?

When in testing mode, signals may slow down your testing as unnecessary signals subscribers which are completely independent from the function being tested will be executed numerous times. To turn off executing the signal subscribers, you have to make a small change in the send function of the blinker library.

Below is what we have done. The approach to turn it off may differ from project to project as the method of testing differs. Refer https://github.com/jek/blinker/blob/master/blinker/base.py#L241 for the original function.


def new_send(self, *sender, **kwargs):
    if len(sender) == 0:
        sender = None
    elif len(sender) > 1:
        raise TypeError('send() accepts only one positional argument, '
                        '%s given' % len(sender))
    else:
        sender = sender[0]
    # only this line was changed
    if not self.receivers or app.config['TESTING']:
        return []
    else:
        return [(receiver, receiver(sender, **kwargs))
                for receiver in self.receivers_for(sender)]
                
Signal.send = new_send

event_signals = Namespace
# and so on ....

That’s all for now. Have some fun signaling 😉 .

 

Set proper content type when uploading files on s3 with python-magic

In the open-event-orga-server project, we had been using Amazon s3 storage for a long time now. After some time we encountered an issue that no matter what the file type was, the Content-Type when retrieving this files from the storage solution was application/octet-stream.

An example response when retrieving an image from s3 was as follows:


Accept-Ranges →bytes
Content-Disposition →attachment; filename=HansBakker_111.jpg
Content-Length →56060
Content-Type →application/octet-stream
Date →Fri, 09 Sep 2016 10:51:06 GMT
ETag →"964b1d839a9261fb0b159e960ceb4cf9"
Last-Modified →Tue, 06 Sep 2016 05:06:23 GMT
Server →AmazonS3
x-amz-id-2 →1GnO0Ta1e+qUE96Qgjm5ZyfyuhMetjc7vfX8UWEsE4fkZRBAuGx9gQwozidTroDVO/SU3BusCZs=
x-amz-request-id →ACF274542E950116

 

As seen above instead of providing image/jpeg as the Content-Type, it provides the Content-Type as application/octet-stream.While uploading the files, we were not providing the content type explicitly, which seemed to be the root of the problem.

It was decided that we would be providing the content type explicitly, so it was time to choose an efficient library to determine the file type based on the content of the file and not the file extension. After researching through the available libraries python-magic seemed to be the obvious choice. python-magic is a python interface to the libmagic file type identification library. libmagic identifies file types by checking their headers according to a predefined list of file types.

Here is an example straight from python-magic‘s readme on its usage:


>>> import magic
>>> magic.from_file("testdata/test.pdf")
'PDF document, version 1.2'
>>> magic.from_buffer(open("testdata/test.pdf").read(1024))
'PDF document, version 1.2'
>>> magic.from_file("testdata/test.pdf", mime=True)
'application/pdf'

 

Given below is a code snippet for the s3 upload function in the project:


file_data = file.read()
    file_mime = magic.from_buffer(file_data, mime=True)
    size = len(file_data)
    # k is defined as  k = Key(bucket) in previous code
    sent = k.set_contents_from_string(
        file_data,
        headers={
            'Content-Disposition': 'attachment; filename=%s' % filename,
            'Content-Type': '%s' % file_mime
        }
    ) 

 

One thing to note that as python-magic uses libmagic-dev as a dependency and many of the distros do not come with libmagic-dev pre-installed, make sure you install libmagic-dev explicitly. (Installation instructions may vary per distro)


sudo apt-get install libmagic-dev

Voila !! Now when retrieving each and every file you’ll get the proper content type.