Dynamically marshalling output in Flask Restplus

Do you use Flask-Restplus ? Have you felt the need of dynamically modifying API output according to condition. If yes, then this post is for you.

In this post, I will show how to use decorators to restrict GET API output. So let’s start.

This is the basic code to create an API. Here we have created a get_speaker API to get a single item from Speaker model.

from flask_restplus import Resource, Model, fields, Namespace
from models import Speaker

api = Namespace('speakers', description='Speakers', path='/')

SPEAKER = Model('Name', {
	'id': fields.Integer(),
	'name': fields.String(),
	'phone': fields.String()
})

class DAO:
	def get(speaker_id):
		return Speaker.query.get(speaker_id)

@api.route('/speakers/<int:speaker_id>')
class Speaker(Resource):
    @api.doc('get_speaker')
    @api.marshal_with(SPEAKER)
    def get(self, speaker_id):
        """Fetch a speaker given its id"""
        return DAO.get(speaker_id)

Now our need is to change the returned API data according to some condition. Like if user is authenticated then only return phone field of the SPEAKER model. One way to do this is to create condition statements in get method that marshals the output according to the situation. But if there are lots of methods which require this, then this is not a good way.

So let’s create a decorator which can change the marshal decorator at runtime. It will accept parameters as which models to marshal in case of authenticated and non-authenticated cases.

from flask_login import current_user
from flask_restplus import marshal_with

def selective_marshal_with(fields, fields_private):
    """
    Selective response marshalling. Doesn't update apidoc.
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if current_user.is_authenticated:
                model = fields
            else:
                model = fields_private
            func2 = marshal_with(model)(func)
            return func2(*args, **kwargs)
        return wrapper
    return decorator

The above code adds a wrapper over the API function which checks if the user is authenticated. If the user is authenticated, fields model is used for marshalling else fields_private is used for marshalling.

So let’s create the private model for SPEAKER. We will call it SPEAKER_PRIVATE.

from flask_restplus import Model, fields

SPEAKER_PRIVATE = Model('NamePrivate', {
	'id': fields.Integer(),
	'name': fields.String()
})

The final step is attaching the selective_marshal_with decorator to the get() method.

@api.route('/speakers/<int:speaker_id>')
class Speaker(Resource):
    @api.doc('get_speaker', model=SPEAKER)
    @selective_marshal_with(SPEAKER, SPEAKER_PRIVATE)
    def get(self, speaker_id):
        """Fetch a speaker given its id"""
        return DAO.get(speaker_id)

You will notice that I removed @api.marshal_with(SPEAKER). This was to disable automatic marshalling of output by flask-restplus. To compensate for this, I have added model=SPEAKER inapi.doc. It will not auto-marshal the output but will still show the swagger documentation.

That concludes this. The get method will now switch marshal field w.r.t to the authentication level of the user. As you may notice, the selective_marhsal_with function is generic and can be used with other models and APIs too.

 

{{ Repost from my personal blog http://aviaryan.in/blog/gsoc/dynamic-marshal-restplus.html }}

Continue ReadingDynamically marshalling output in Flask Restplus

Flask-SocketIO Notifications

In the previous post I explained about configuring Flask-SocketIO, Nginx and Gunicorn. This post includes integrating Flask-SocketIO library to display notifications to users in real time.

Flask Config

For development we use the default web server that ships with Flask. For this, Flask-SocketIO fallsback to long-polling as its transport mechanism, instead of WebSockets. So to properly test SocketIO I wanted to work directly with Gunicorn (hence the previous post about configuring development environment). Also, not everyone needs to be bothered with the changes required to run it.

class DevelopmentConfig(Config):
    DEVELOPMENT = True
    DEBUG = True

    # If Env Var `INTEGRATE_SOCKETIO` is set to 'true', then integrate SocketIO
    socketio_integration = os.environ.get('INTEGRATE_SOCKETIO')
    if socketio_integration == 'true':
        INTEGRATE_SOCKETIO = True
    else:
        INTEGRATE_SOCKETIO = False

    # Other stuff

SocketIO is integrated (in development env) if the developer has set the INTEGRATE_SOCKETIO environment variable to “true”. In Production, our application runs on Gunicorn, and SocketIO integration must always be there.

Flow

To send message to a particular connection (or a set of connections) Flask-SocketIO provides Rooms. The connections are made to join a room and the message is sent in the room. So to send message to a particular user we need him to join a room, and then send the message in that room. The room name needs to be unique and related to just one user. The User database Ids could be used. I decided to keep user_{id} as the room name for a user with id {id}. This information (room name) would be needed when making the user join a room, so I stored it for every user that logged in.

@expose('/login/', methods=('GET', 'POST'))
    def login_view(self):
        if request.method == 'GET':
            # Render template
        if request.method == 'POST':
            # Take email and password from form and check if 
            # user exists. If he does, log him in.
            login.login_user(user)

            # Store user_id in session for socketio use
            session['user_id'] = login.current_user.id

            # Redirect

After the user logs in, a connection request from the client is sent to the server. With this connection request the connection handler at server makes the user join a room (based on the user_id stored previously).

@socketio.on('connect', namespace='/notifs')
def connect_handler():
    if current_user.is_authenticated():
        user_room = 'user_{}'.format(session['user_id'])
        join_room(user_room)
        emit('response', {'meta': 'WS connected'})

The client side is somewhat similar to this:

<script src="{{ url_for('static', filename='path/to/socket.io-client/socket.io.js') }}"></script>
<script type="text/javascript">
$(document).ready(function() {
    var namespace = '/notifs';

    var socket = io.connect(location.protocol + "//" + location.host + namespace, {reconnection: false});

    socket.on('response', function(msg) {
        console.log(msg.meta);
        // If `msg` is a notification, display it to the user.
    });
});
</script>

Namespaces helps when making multiple connections over the same socket.

So now that the user has joined a room we can send him notifications. The notification data sent to the client should be standard, so the message always has the same format. I defined a get_unread_notifs method for the User class that fetches unread notifications.

class User(db.Model):
    # Other stuff

    def get_unread_notifs(self, reverse=False):
        """Get unread notifications with titles, humanized receiving time
        and Mark-as-read links.
        """
        notifs = []
        unread_notifs = Notification.query.filter_by(user=self, has_read=False)
        for notif in unread_notifs:
            notifs.append({
                'title': notif.title,
                'received_at': humanize.naturaltime(datetime.now() - notif.received_at),
                'mark_read': url_for('profile.mark_notification_as_read', notification_id=notif.id)
            })

        if reverse:
            return list(reversed(notifs))
        else:
            return notifs

This class method is used when a notification is added in the database and has to be pushed into the user SocketIO room.

def create_user_notification(user, action, title, message):
    """
    Create a User Notification
    :param user: User object to send the notification to
    :param action: Action being performed
    :param title: The message title
    :param message: Message
    """
    notification = Notification(user=user,
                                action=action,
                                title=title,
                                message=message,
                                received_at=datetime.now())
    saved = save_to_db(notification, 'User notification saved')

    if saved:
        push_user_notification(user)

def push_user_notification(user):
    """
    Push user notification to user socket connection.
    """
    user_room = 'user_{}'.format(user.id)
    emit('response',
         {'meta': 'New notifications',
          'notif_count': user.get_unread_notif_count(),
          'notifs': user.get_unread_notifs()},
         room=user_room,
         namespace='/notifs')
Continue ReadingFlask-SocketIO Notifications

Unit Testing

There are many stories about unit testing. Developers sometimes say that they don’t write tests because they write a good quality code. Does it make sense, if no one is infallible?.

At studies only a  few teachers talk about unit testing, but they only show basic examples of unit testing. They require to write a few tests to finish final project, but nobody really  teaches us the importance of unit testing.

I have also always wondered what benefits can it bring. As time is a really important factor in our work it often happens that we simply resign of this part of process development to get “more time” rather than spend time on writing stupid tests. But now I know that it is a vicious circle.

Customers requierments does not help us. They put a high pressure to see visible results not a few statistics about coverage status. None of them cares about some strange numbers. So, as I mentioned above, we usually focuses on building new features and get riid of tests. It may seem to save time, but it doesn’t.

In reality tests save us a lot of time because we can identify and fix bugs very quickly. If a bug ocurrs because someone’s change we don’t have to spend long hours trying to figure out wgat is going out. That’s why we need tests.  

It is especially visible in huge open source projects. FOSSASIA organization has about 200 contributors. In OpenEvent project we have about 20 active developers, who generate many lines of code every single day. Many of them change over and over again as well as interfere  with each other.

Let me provide you with a simple example. In our team we have about 7 pull requests per day. As I mentioned above we want to make our code high quality and free of bugs, but without testing identifying if pull request causes a bug is very difficult task. But fortunately this boring job makes Travis CI for us. It is a great tool which uses our tests and runs them on every PR  to check if bugs occur. It helps us to quickly notice bugs and maintain our project very well.

What is unit testing?

Unit testing is a software development method in which the smallest testable parts of an application are tested

Why do we need writing unit tests?

Let me point all arguments why unit testing is really important while developing a project.

  • To prove that our code works properly

If developer adds another condition, test checks if method returns correct results. You simply don’t need to wonder if something is wrong with you code.

  • To reduce amount of bugs

It let you to know what inputs params’ function should get and what results should be returned. You simply don’t  write unused code

  • To save development time

Developers don’t waste time on checking every code’s change if his code works correctly

  • Unit tests help to understand software design
  • To provide quick feedback about method which you are testing
  • To help document a code

How to write unit test in Python

In my work I write use tests in Python. I am going to share my sample code  with you now

  • Import module unittest
  • Choose function to test
  • Write unit test

Example OpenEvent test in Python

class TestPagesUrls(OpenEventTestCase):

   def setUp(self):

       self.app = Setup.create_app()

   def test_if_urls_exist(self):

       """Test all urls via GET method"""

       with app.test_request_context():

           for rule in app.url_map.iter_rules():

               if excluded_paths(rule):

                   status_code = self.app.get(request.url[:-1] + str(rule).replace('//', '/'),        follow_redirects=True).status_code

                   self.assertTrue(status_code in [200, 302, 401])

 

I want to check if all views exist but it required a lot of time. That’s why I wonder I how to avoid writing similar tests. Finally, based  on our list of routes I am able to write test which checks code’s status  on every page.

If some of them response returns status_code different than 200, 302 or 401, test fails.This results means that somethings is wrong. Simple, isn’t it ?  Try to test it manually…. This one short test cover about 40 use cases…

This example shows an incredible value of unit tests! If developer makes a bug in response he receives an error that something is wrong with a view. Travis CI allows to reject all  wrong pull requests and merge only these which fulfill our quality requirements.   

Fixing  error is one part but finding a bug is even harder task. But an ability to detect bug on early stage of process development reduces cost of software.

 

Continue ReadingUnit Testing

Update Fields with Array Input

Screenshot from 2016-07-15 17:57:58.png

There are certain fields in form where instead of a single value, we need an array of values to be stored. This are fields under the same category but having multiple elements, e.g., Tracks, Session Types, Microlocations, Sponsors, Social Links and similar such fields. Now as we know the way of doing this using simple html is provide the <input> tag with property “name” as “<field_name>[]”. So suppose we want to save the names of the mutliple tracks, we will have something like this
Screenshot from 2016-07-15 18:02:09.png

But the problem begins when you want to update name of a particular element (in this case Track). How to do it? Should we delete and create entries again? That doesn’t sound too good, does it? So what should we do? Let’s see….

Continue ReadingUpdate Fields with Array Input

Setting up Celery with Flask

In this article, I will explain how to use Celery with a Flask application. Celery requires a broker to run. The most famous of the brokers is Redis. So to start using Celery with Flask, first we will have to setup the Redis broker.

Redis can be downloaded from their site http://redis.io. I wrote a script that simplifies downloading, building and running the redis server.

#!/bin/bash
# This script downloads and runs redis-server.
# If redis has been already downloaded, it just runs it
if [ ! -d redis-3.2.1/src ]; then
    wget http://download.redis.io/releases/redis-3.2.1.tar.gz
    tar xzf redis-3.2.1.tar.gz
    rm redis-3.2.1.tar.gz
    cd redis-3.2.1
    make
else
    cd redis-3.2.1
fi
src/redis-server

When the above script is ran from the first time, the redis folder doesn’t exist so it downloads the same, builds it and then runs it. In subsequent runs, it will skip the downloading and building part and just run the server.

Now that the redis server is running, we will have to install its Python counterpart.

pip install redis

After the redis broker is set, now its time to setup the celery extension. First install celery by using pip install celery. Then we need to setup celery in the flask app definition.

# in app.py
def make_celery(app):
	# set redis url vars
	app.config['CELERY_BROKER_URL'] = environ.get('REDIS_URL', 'redis://localhost:6379/0')
    app.config['CELERY_RESULT_BACKEND'] = app.config['CELERY_BROKER_URL']
    # create context tasks in celery
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

celery = make_celery(current_app)

Now that Celery is setup on our project, let’s define a sample task.

@app.route('/task')
def view():
	background_task.delay(*args, **kwargs)
	return 'OK'

@celery.task
def background_task(*args, **kwargs):
	# code
	# more code

Now to run the celery workers, execute

celery worker -A app.celery

That should be all. Now to run our little project, we can execute the following script.

bash run_redis.sh &  # to run redis
celery worker -A app.celery &  # to run celery workers
python app.py

If you are wondering how to run the same on Heroku, just use the free heroku-redis extension. It will start the redis server on heroku. Then to run the workers and app, set the Procfile as –

web: sh heroku.sh

Then set the heroku.sh as –

#!/bin/bash
celery worker -A app.celery &
gunicorn app:app

That’s a basic guide on how to run a Flask app with Celery and Redis. If you want more information on this topic, please see my post Ideas on Using Celery in Flask for background tasks.

Continue ReadingSetting up Celery with Flask

Ideas on using Celery with Flask for background tasks

Simply put, Celery is a background task runner. It can run time-intensive tasks in the background so that your application can focus on the stuff that matters the most. In context of a Flask application, the stuff that matters the most is listening to HTTP requests and returning response.

By default, Flask runs on a single-thread. Now if a request is executed that takes several seconds to run, then it will block all other incoming requests as it is single-threaded. This will be a very bad-experience for the user who is using the product. So here we can use Celery to move time-hogging part of that request to the background.

I would like to let you know that by “background”, Celery means another process. Celery starts worker processes for the running application and these workers receive work from the main application. Celery requires a broker to be used. Broker is nothing but a database that stores results of a celery task and provides a shared interface between main process and worker processes. The output of the work done by the workers is stored in the Broker. The main application can then access these results from the Broker.

Using Celery to set background tasks in your application is as simple as follows –

@celery.task
def background_task(*args, **kwargs):
    # do stuff
    # more stuff

Now the function background_task becomes function-able as a background task. To execute it as a background task, run –

task = background_task.delay(*args, **kwargs)
print task.state  # task current state (PENDING, SUCCESS, FAILURE)

Till now this may look nice and easy but it can cause lots of problems. This is because the background tasks run in different processes than the main application. So the state of the worker application differs from the real application.

One common problem because of this is the lack of request context. Since a celery task runs in a different process, so the request context is not available. Therefore the request headers, cookies and everything else is not available when the task actually runs. I too faced this problem and solved it using an excellent snippet I found on the Internet.

"""
Celery task wrapper to set request context vars and global
vars when a task is executed
Based on http://xion.io/post/code/celery-include-flask-request-context.html
"""
from celery import Task
from flask import has_request_context, make_response, request, g

from app import app  # the flask app


__all__ = ['RequestContextTask']


class RequestContextTask(Task):
    """Base class for tasks that originate from Flask request handlers
    and carry over most of the request context data.
    This has an advantage of being able to access all the usual information
    that the HTTP request has and use them within the task. Pontential
    use cases include e.g. formatting URLs for external use in emails sent
    by tasks.
    """
    abstract = True

    #: Name of the additional parameter passed to tasks
    #: that contains information about the original Flask request context.
    CONTEXT_ARG_NAME = '_flask_request_context'
    GLOBALS_ARG_NAME = '_flask_global_proxy'
    GLOBAL_KEYS = ['user']

    def __call__(self, *args, **kwargs):
        """Execute task code with given arguments."""
        call = lambda: super(RequestContextTask, self).__call__(*args, **kwargs)

        # set context
        context = kwargs.pop(self.CONTEXT_ARG_NAME, None)
        gl = kwargs.pop(self.GLOBALS_ARG_NAME, {})

        if context is None or has_request_context():
            return call()

        with app.test_request_context(**context):
            # set globals
            for i in gl:
                setattr(g, i, gl[i])
            # call
            result = call()
            # process a fake "Response" so that
            # ``@after_request`` hooks are executed
            # app.process_response(make_response(result or ''))

        return result

    def apply_async(self, args=None, kwargs=None, **rest):
        self._include_request_context(kwargs)
        self._include_global(kwargs)
        return super(RequestContextTask, self).apply_async(args, kwargs, **rest)

    def apply(self, args=None, kwargs=None, **rest):
        self._include_request_context(kwargs)
        self._include_global(kwargs)
        return super(RequestContextTask, self).apply(args, kwargs, **rest)

    def retry(self, args=None, kwargs=None, **rest):
        self._include_request_context(kwargs)
        self._include_global(kwargs)
        return super(RequestContextTask, self).retry(args, kwargs, **rest)

    def _include_request_context(self, kwargs):
        """Includes all the information about current Flask request context
        as an additional argument to the task.
        """
        if not has_request_context():
            return

        # keys correspond to arguments of :meth:`Flask.test_request_context`
        context = {
            'path': request.path,
            'base_url': request.url_root,
            'method': request.method,
            'headers': dict(request.headers),
        }
        if '?' in request.url:
            context['query_string'] = request.url[(request.url.find('?') + 1):]

        kwargs[self.CONTEXT_ARG_NAME] = context

    def _include_global(self, kwargs):
        d = {}
        for z in self.GLOBAL_KEYS:
            if hasattr(g, z):
                d[z] = getattr(g, z)
        kwargs[self.GLOBALS_ARG_NAME] = d

To run a task in Request context mode, do –

@celery.task(base=RequestContextTask, bind=True)
def background_task(self, *args, **kwargs):
    # do stuff
    # more stuff

If you are wondering what the RequestContextTask class does, it simply stores all request context vars and global vars when a background task is called (task.delay()) and then unpacks those values to their proper places when the task is about to be run. The above snippet can be easily extended to store any value.

Another challenge that some people may face is the occasional Parsing/Serialization error. This happens because the data being sent to/from a function that is to be background executed is too complex.

Serialization is the process of converting complex data structures and objects into a plain string. Serialization of data is necessary because the background tasks and the main thread run in different processes. Now think how will the main thread communicate the celery thread to do some task. This is done using serialization of the concerned data. So to avoid serialization errors, it is recommended that you make background tasks such that they require only simple arguments to run and they return only simple data.

So basically keeping small and simple tasks is recommended when using Celery. Follow this golden rule and you will not run into any problems.

 

{{ Repost from my personal blog | http://aviaryan.in/blog/gsoc/celery-flask-good-ideas.html }}

Continue ReadingIdeas on using Celery with Flask for background tasks

Shell hacks

Custom Shell

Working with database models needs a lot of use of the flask shell. You can access it with:

python manage.py shell

The default shell is quite unintuitive. It doesn’t pretty print the outputs and has no support for auto-completion. I’ve installed IPython that takes care of that. But still working with models means writing a lot of import statements. Plus if there was some change in the code related to the app, then the shell had to restarted again so the changes could be loaded. Meaning writing the import statements again.

We were using Flask-Script and I wanted to run a custom shell that imports all the required modules, models and helper functions, so I don’t have to write them over and over. Some of them were as long as:

from open_event.models.users_events_roles import UsersEventsRoles

So I created a custom shell command with different context that overrides the default shell provided by the Flask-Script Manager. It was pretty easy with Flask-Script. One thing I had to keep in mind is that it needed to be in a different file than manage.py. Since manage.py was committed to source repo, changes to it would be tracked. So I needed a different file that could be excluded from the source repo. I created an smg.py that imported the Manager from the open_event module and overrides the shell command.

from flask_script import Shell

from open_event import manager
from open_event.models.user import User
from open_event.models.event import Event
from open_event.helpers.data import save_to_db, delete_from_db

def _make_context():
    return dict(
        uq=User.query,
        eq=Event.query,
        su=User.query.get(1),
        User=User,
        Event=Event,
        savetodb=save_to_db,
        deletefromdb=delete_from_db
    )


if __name__ == "__main__":
    manager.add_command('shell', Shell(make_context=_make_context))
    manager.run()

Place this smg.py file in the same directory as manage.py, so you can access it with python smg.py shell.

The code is pretty simple to understand. We import the Shell class from flask_script, create its object with our context and then add it to the manager as a command. _make_context contains what I usually like to have in my shell. It must always return a dictionary. The keys of this dictionary would be available as statements inside the shell with their values specified here.

This helps a lot. Most of the time I would be working with the super_admin user, and I would need its User object from time to time. The super_admin user is always going to be the first user (User object with id 1). So instead of from open_event.models.user import User; su = User.query.get(1) I could just use the su variable. Models like User and Event are also readily available (so are their base queries). This is the default context that I always keep, but many times you need more models than the ones specified here. Like when I was working with the permissions system.

from open_event.models.users_events_roles import UsersEventsRoles
from open_event.models.service import Service
from open_event.models.role import Role

def _make_context():
    return dict(
        # usual stuff
        UER=UsersEventsRoles,
        Service=Service,
        Role=Role
    )

You can even write a script that fetches all the database models (instance of sqlalchemy Model class) and then add them to the _make_context dictionary. I like to keep it minimum and differently named, so there are no conflicts of classes when try to auto-complete.

One more thing, you need to exclude the smg.py file so that git doesn’t track it. You can simply add it in the .git/info/exclude file.

I also wrote some useful one-liner bash commands.

Revision History

Every time someone updates the database models, he needs to migrate and provide the migrations file to others by committing it to the source. These files help us upgrade our already defined database. We work with Alembic. Regarding alembic revisions (migration files) you can keep two things in mind. One is the Head, that keeps track of the latest revision, and another is Current, that specifies what revision your tables in the database are based on. If your current revision is not a head, it means your database tables are not up-to-date. You need to upgrade (python manage.py db upgrade). The can be multiple heads in the revision history. python manage.py heads displays all of them. The current revision can be fetched with python manage.py current. I wanted something that automatically checks if current is at a head.

python manage.py db history | grep --color "$(python manage.py db current 2> /dev/null)|$(python manage.py db heads)|$"

You can specify it as an alias in .bashrc. This command displays the revision history with the head and current revision being colored.

Screenshot from 2016-07-11 16:34:18

You can identify the head as it is always appended by “(head)”. The other colored revision would be the current.

You can also reverse the output, by printing bottom to up. This is helpful when the revision history gets large and you have to scroll up to see the head.

alias rev='python manage.py db history | tac | grep --color "$(python manage.py db current 2> /dev/null)|$(python manage.py db heads)|$"'

Screenshot from 2016-07-11 16:42:30.png

Current Revision changer

The current revision is maintained at the database in the one-column alembic_version table. If for some reason the migration file for current revision no longer exists (like when changing to a branch at git that doesn’t have the file), alembic would raise an error. So to change the revision at the database I wrote a bash function.

change_db_current () {
    echo "Current Revision: "
    read current_rev
    echo "New Revision: "
    read new_rev

    echo 'c test \ update alembic_version set version_num='"'"$new_rev"'"' where version_num='"'"$current_rev"'"';' | sudo su - postgres -c psql
}

Enter the current revision hash (displayed in the alembic error) and the revision you need to point the current to, and it will update the table with the new revision. Sick right! It’s pretty stupid actually. It was when I didn’t know how to stamp in alembic. Anyways, it was useful some time ago.

Continue ReadingShell hacks

Implementing Admin Trash in Open Event

So last week I had the task of implementing a trash system for the Admin. It was observed that sometimes a user may delete an item and then realize that the item needs to be restores. Thus a trash system works well in this case. Presently the items that are being moved to the trash are:

  • Deleted Users
  • Deleted Events
  • Deleted Sessions

So it works like this. I added a column in_trash to the tables User, Event and Sessions to mark whether the item is in the trash or not

in_trash = db.Column(db.Boolean, default=False)

So depending on whether the value is True or False the item will be in the trash of the admin. Thus for a normal user on deleting an event, user or session a message would flash that the item is deleted and the item would not be shown in the table list of the user. However it would not be deleted from the database.

trash4.png

trash5.png

Thus for the user the item is deleted. The item’s in_trash property is set to True and it gets moved to the trash. The items are displayed in the “Deleted Items” section of the Admin panel

trash1trash2trash3

The items deleted are displayed in the trash and as soon as they deleted in the trash they are deleted from the database permanently. A message will flash for the Admin when it is deleted

trash11

trash10.png

Thus the trash is implemented. 🙂

Two more things are left:

  • To restore items from trash
  • To automatically delete the items in trash after an inactivity of 30 days

This will soon be implemented 🙂

Continue ReadingImplementing Admin Trash in Open Event

File Uploading in Flask

Last week I took up an issue of adding upload functionality to the open-event server. I had to implement the upload in 3 places – one in the sponsor table to upload images, another in the user profile page and the third is to upload slides in the session form. However the basic function behind it remains the same. File upload makes use of the inbuilt werkzeug FileStorage class. A code snippet will help understand how it works:

Capture.PNG

So  on selecting the file it gets stored in the request.files object. It is temporarily stored in the FileStorage in werkzeug. Now we access the file’s name using request.files[‘files’] and then using the inbuilt save()  function in flask it gets saved to the folder specified by us. There are some frameworks available for file uploading in Flask but all this can be done using the standard libraries also and there is no such need of the frameworks.

However instead of storing just the direct name of the file we make use of the secure_filename of werkzeug to save it.Capture2.PNG

Thus the secure_filename stores the file in the format given in the image and makes uploading easier by converting the long url/image path to an easier one.

Continue ReadingFile Uploading in Flask

Challenges with Migrations

So I am sitting in front of my laptop one day working on some issue. I remember it to be adding a database table. Since this is the first time I am actually creating a table I didn’t know how much this issue is going to haunt me! So I created a table SessionType just like other previously created tables and I make a PR happy to solve an issue. But just when I thought everything was going smooth this error comes up…

“Table session_type does not exist”

I understand that you have to again create the database so I run python create_db.py. But still I get the error. I search about it and find that every time I make changes to the database tables I have to update and generate a new migration script. Thus I run python manage.py db migrate and I get THE error:

“Cant locate revision identified by *some random revision number* ”

I literally went mad trying to solve this issue. Moreover whenever I would switch branches and the database tables were different I would get this error. Finally I found out a way to solve all the migration related problems (most of them :p). I added a script to drop the entire database.Since right now we are in the development stage it doesnt matter if the data in the database is deleted. So you run :

  1. python drop_db.py
  2. python create_db.py
  3. Use super_admin password to create the database
  4. But No! Migration is a little mean bastard!It wont leave you like that. You have to make the current revision and the head equal. Thus if your current is referring to some other revision ID then stamp the database table with the head.
  5. Run python manage.py db stamp head
  6. Now run python manage.py db migrate

 

Finally! The issue is solved. 🙂

 

 

Continue ReadingChallenges with Migrations