Implementing Fee Structure for Ticketing

After implementing the ticketing system in Open Event, it was necessary to implement the control of fees for each type of currency. Thus an Admin page for controlling the percentage of fees and the maximum allowed fee for each type of currency was made:

1

Here initially on loading the system for the first time the service fees and maximum fees are 0. The Super Admin then sets the values. However if for some reason the maximum fee is still left blank then it becomes equal to the service fees.

2

The backend code for this is as follows:

if 'service_fee' in kwargs:
    ticket_service_fees = kwargs.get('service_fee')
    ticket_maximum_fees = kwargs.get('maximum_fee')
    from app.helpers.data_getter import DataGetter
    from app.helpers.data import save_to_db
    currencies = DataGetter.get_payment_currencies()
    ticket_fees = DataGetter.get_fee_settings()
    if not ticket_fees:
        for i, (currency, has_paypal, has_stripe) in enumerate(currencies):
            currency = currency.split(' ')[0]
            if float(ticket_maximum_fees[i]) == 0.0:
                ticket_maximum_fees[i] = ticket_service_fees[i]
            ticket_fee = TicketFees(currency=currency,
                                    service_fee=ticket_service_fees[i],
                                    maximum_fee=ticket_maximum_fees[i])
            save_to_db(ticket_fee, "Ticket Fees settings saved")
    else:
        i = 0
        for fee in ticket_fees:
            if float(ticket_maximum_fees[i]) == 0.0:
                ticket_maximum_fees[i] = ticket_service_fees[i]
            fee.service_fee = ticket_service_fees[i]
            fee.maximum_fee = ticket_maximum_fees[i]
            save_to_db(fee, "Fee Options Updated")
            i += 1

So it checks if the ‘service fees’ is there in the request. If Yes then it stores all the service fees in one list and all the corresponding maximum fees in another list. Then it checks if the fee settings are already stored in the database using the

ticket_fees = DataGetter.get_fee_settings()

If the organizer is setting the fees for the first time then it will return None. In that case new settings are created. If fees are already created then the settings are just modified. And in both cases if the maximum fee is 0 then the maximum fee will be set to service fee.

Thus the fee system is implemented for the tickets.

Implementing Payment and Tax System for Open-Event

So I implemented the payment system and tax system for the payment part in the ticketing system. The first step was making a list of available countries and currency options for the system. So I created and added the following list:

PAYMENT_COUNTRIES = {
    'United States',
    'Argentina',
    'Australia',
    'Austria',
    'Belgium',
    'Brazil',
    'Canada',
    'Cyprus',
    'Czech Republic',
    'Denmark',
    'Estonia',
    'Finland',
    'France',
    'Germany',
    'Greece',
    'Hong Kong',
    'Hungary',
    'Ireland',
    'Israel',
    'Italy',
    'Japan',
    'Latvia',
    'Lithuania',
    'Luxemborg',
    'Malaysia',
    'Malta',
    'Mexico',
    'Netherlands',
    'New Zealand',
    'Norway',
    'Philippines',
    'Poland',
    'Portugal',
    'Singapore',
    'Slovakia',
    'Slovenia',
    'Spain',
    'Sweden',
    'Switzerland',
    'Taiwan',
    'United Kingdom',
}

PAYMENT_CURRENCIES = {
    'ARS Argentine Peso $',
    'AUD Australian Dollars A$',
    'BRL Brazilian Real R$',
    'CAD Canadian Dollars C$',
    'CZK Czech Koruna Kč',
    'DKR Danish Krone Dkr',
    'EUR Euros €',
    'HKD Hong Kong Dollar HK$',
    'HUF Hungarian Forint Ft',
    'ILS Israeli Shekels ₪',
    'JPY Japanese Yen ¥',
    'MYR Malaysian Ringgits RM',
    'MXN Mexican Pesos Mex$',
    'NZD New Zealand Dollar NZ$',
    'NOK Norwegian Krone Nkr',
    'PHP Philippine Pesos ₱',
    'PLN Polish Zloty zł',
    'GBP Pounds Sterling £',
    'SGD Singapore Dollar SG$',
    'SEK Swedish Krona Skr',
    'CHF Swiss Franc Fr',
    'TWD Taiwan New Dollars NT$',
    'THB Thai baht ฿',
    'USD U.S. Dollars $',
}

This is the list of currently supported countries and currencies. Thus the user can choose from the above list in the following step on the first page of event creation:

1

If the user chooses a Paid ticket or a Donation ticket then he/she has to compulsorily choose a country and a currency for the event. Next in line is the system of payments – the way in which the organizer wants payments to be made for his/her event. They include the option of Online Payments and Offline Payments. The organizer has to tick the checkbox in order to enable the payment option. On enabling the PayPal and Stripe checkboxes he/she is represented with the following:

2

On enabling PayPal option the organizer has to enter the PayPal email and for Stripe he has to connect it with Stripe account.

And the final step is the addition of tax for the event. The organizer can choose whether he/she wants to enable tax for the event or not:

3

On choosing Yes he is presented with the tax form:

4

If the organizer wants to send invoices then on enabling invoices another form is displayed with details pertaining to Business Address, Registered Name etc….

5

Finally we have two options that whether we want to display the tax as separate fee or include in the price of tickets.

Ticketing System in Open-Event

So we implemented the ticketing system in the open-event. Basically we provide the user with two options – either add his/her own ticket url or use our own ticketing system. If the ticketing module is turned off then there is no option and the user has to add a Ticket URL.

1

2

Thus only Add Ticket URL is shown if the ticketing switch is turned off. However if the ticket switch is turned ON then we display our own ticketing system i.e. provide with an option to choose to the user.

3

Now the ticket feature can be either Free, Paid or by donation. If the ticket feature is free then just the normal ticketing details are entered by the user. However if Paid option is selected then a payment system is displayed to the user  where he/she has to choose the country and the currency in which the user will make the payment.

4

The user can pay through PayPal and we can also decide whether we want to add Tax to the event or not.

Implementing Module system in Open-Event

We had to implement the following modules in our system

  • Ticketing
  • Payments
  • Donations

However we wanted the super admin to enable or disable the modules. Hence we implemented the module system so that all three of them can be switched ON/OFF. The following screenshot will help understand better:

modules

So basically we have switches for all three modules. If ticketing is enabled only then can we see the payment and donations system because those two are part of the ticketing system. I created a module database table for storing the values in the database. To store the switch states I implemented the following javascript code:

<script type="text/javascript">

    var modulesForm = [{}];

    Array.prototype.setIncluded = function (field, state) {
        this[0][field].include = state ? 1 : 0;
    };


    function includeClick(button) {
        var $row = $(button).closest("tr");
        var $button = $(button);

        if ($button.data('group') == 'modules') {
            modulesForm.setIncluded($row.data('identifier'), button.checked);
        }
        persistData();
    }

    $(function () {
        $.each($(".modules-options-table").find('tr[data-identifier]'), function (key, row) {
            var $row = $(row);
            modulesForm[0][$row.data('identifier')] = {
                include: $row.find('.include-switch')[0].checked ? 1 : 0
            }
        });

        $('[data-toggle="tooltip"]').tooltip();

        persistData();
    });

    function persistData() {
        $("#modules-value-form").attr('value', JSON.stringify(modulesForm[0]));
    }


</script>

If a module is enabled i.e. if the module is included then the corresponding “include switch” is “checked” and then added to the modulesForm dict. In same way each value of the switch is added. Thus the dict will contain values for each switch/ module in the form:

[{ticketing:include:1},{payments:include:1},{donations:include:0}]

Now the only thing left to do is to iterate through the list and check if the module is included or not. Here is the code which does it:

class SuperAdminModulesView(SuperAdminBaseView):

    @expose('/')
    def index_view(self):
        module = DataGetter.get_module()
        include_settings = []

        if module:
            if module.ticket_include:
                include_settings.append('ticketing')
            if module.payment_include:
                include_settings.append('payments')
            if module.donation_include:
                include_settings.append('donations')

        return self.render('/gentelella/admin/super_admin/modules/modules.html', include_settings=include_settings)

    @expose('/save', methods=['GET', 'POST'])
    def modules_save_view(self):
        create_modules(request.form)

        include_settings = []
        settings = request.form.getlist('modules_form[value]')

        if settings[0][24] == '1':
            include_settings.append('ticketing')
        if settings[0][49] == '1':
            include_settings.append('payments')
        if settings[0][75] == '1':
            include_settings.append('donations')

        return self.render('/gentelella/admin/super_admin/modules/modules.html', include_settings=include_settings)

“settings” is the dict which we get from the modules page. “settings[0][24]” refers to the include value of ticketing, “settings[0][49]” refers to the include value of payments and the next for donations. Thus depending on whether it is 1 or 0 we add strings ‘ticketing’, ‘payments’ and ‘donations’ to the included_settings. Similarly the create_modules(form) adds the values to the database to store it.

def create_modules(form):
    modules_form_value = form.getlist('modules_form[value]')
    module = DataGetter.get_module()

    if module is None:
        module = Module()

    if str(modules_form_value[0][24]) == '1':
        module.ticket_include = True
    else:
        module.ticket_include = False

    if str(modules_form_value[0][49]) == '1':
        module.payment_include = True
    else:
        module.payment_include = False

    if str(modules_form_value[0][75]) == '1':
        module.donation_include = True
    else:
        module.donation_include = False

    save_to_db(module, "Module settings saved")
    events = DataGetter.get_all_events()

    if module.ticket_include:
        for event in events:
            event.ticket_include = True
            save_to_db(event, "Event updated")

 

Using Cron Scheduling to automatically run background jobs

Cron scheduling is nothing new. It is basically a concept in which the system keeps executing lines of code every few seconds, minutes, or hours. It is required in many large applications which require some automation in their working. I had to use it for two purposes in our Open-Event application.

  • To automatically delete the items in the trash after a period of 30 days.
  • To automatically send after event mails to the speakers and organizers when an event gets completed

1. Delete items in Trash system

So the deleted items get stored in the trash of the admin. However if the items are not deleted or no action is performed on them then they should be deleted automatically if they stay in the trash for more than 30 days. I have used a framework – apscheduler (Advanced Python Scheduler) for this.The code is like this

from apscheduler.schedulers.background import BackgroundScheduler

def empty_trash():
 with app.app_context():
 print 'HELLO'
 events = Event.query.filter_by(in_trash=True)
 users = User.query.filter_by(in_trash=True)
 sessions = Session.query.filter_by(in_trash=True)
 for event in events:
 if datetime.now() - event.trash_date >= timedelta(days=30):
 DataManager.delete_event(event.id)

 for user in users:
 if datetime.now() - user.trash_date >= timedelta(days=30):
 transaction = transaction_class(Event)
 transaction.query.filter_by(user_id=user.id).delete()
 delete_from_db(user, "User deleted permanently")

 for session in sessions:
 if datetime.now() - session.trash_date >= timedelta(days=30):
 delete_from_db(session, "Session deleted permanently")


trash_sched = BackgroundScheduler(timezone=utc)
trash_sched.add_job(empty_trash, 'cron', day_of_week='mon-fri', hour=5,
                    minute=30)
trash_sched.start()

The trash_sched is initialized first. It is given an instance of BackgroundScheduler() meaning it will always run in the background as long as the application server is running.

The second line defines the trigger which is given to the scheduler meaning at what time intervals do we want the scheduler to execute the function.

trash_sched.add_job(empty_trash, 'cron', day_of_week='mon-fri', hour=5, 
                    minute=30)

Here the scheduler adds the function to the job list. The function is empty_trash and the trigger given here is ‘cron’. The following line:

day_of_week='mon-fri', hour=5, minute=30

sets the time at which the sheduler executes the job. The above line means that the job will be executed every day from Monday to Friday at 5:30 AM. Now coming to the function which is to be executed:

def empty_trash():
    with app.app_context():
        events = Event.query.filter_by(in_trash=True)
        users = User.query.filter_by(in_trash=True)
        sessions = Session.query.filter_by(in_trash=True)
        for event in events:
            if datetime.now() - event.trash_date >= timedelta(days=30):
                DataManager.delete_event(event.id)

        for user in users:
            if datetime.now() - user.trash_date >= timedelta(days=30):
                transaction = transaction_class(Event)
                transaction.query.filter_by(user_id=user.id).delete()
                delete_from_db(user, "User deleted permanently")

        for session in sessions:
            if datetime.now() - session.trash_date >= timedelta(days=30):
                delete_from_db(session, "Session deleted permanently")

There are three items in the trash: events, users and sessions. We get all the items in the trash by the query.all() method. Each model: Events, Users and Sessions has a column

trash_date = db.Column(db.DateTime)

The date on which the item is moved to the trash is stored in the trash_date column. And the following line:

 if datetime.now() - event.trash_date >= timedelta(days=30)

checks if the item has been in the trash for more than 30 days. If yes then it is deleted automatically. This function is constantly executed by the apscheduler according to the time settings given by us. Thus we do not need to manually delete the trash after 30 days.

2. Send After Event Mails

This is similar to the trash emptying function.

from apscheduler.schedulers.background import BackgroundScheduler

def send_after_event_mail():
 with app.app_context():
 events = Event.query.all()
 for event in events:
 upcoming_events = DataGetter.get_upcoming_events(event.id)
 organizers = DataGetter.get_user_event_roles_by_role_name(event.id, 'organizer')
 speakers = DataGetter.get_user_event_roles_by_role_name(event.id, 'speaker')
 if datetime.now() > event.end_time:
 for speaker in speakers:
 send_after_event(speaker.user.email, event.id, upcoming_events)
 for organizer in organizers:
 send_after_event(organizer.user.email, event.id, upcoming_events)

#logging.basicConfig()
sched = BackgroundScheduler(timezone=utc)
sched.add_job(send_after_event_mail, 'cron', day_of_week='mon-fri', hour=5, minute=30)
#sched.start()

The scheduler settings are the same as the trash scheduler settings. The function returns all the events and checks whether the event’s end_date has come or not. This check is performed against the present date by the following line:

if datetime.now() > event.end_time

If yes then the speakers and organizers for that event are obtained and after event mails are sent to them automatically.

In this way the whole system is automated. 🙂

Adding Client Side validation to Login and Registration Forms

Its very important to have a client side validation apart from a server side validation. The server side validation only helps the developers but not the clients. Thus it was necessary to add a client side validation to the Login page and its other components so that the users  would be comfortable in this.

I had never before done this and was looking at how to achieve this using jQuery or JavaScript when I cam to know that we were already using an amazing validation tool : Bootstrap Validator

It just involves wrapping the form in the html with the validator plugin and all the checks are automatically carried out by it.

 

1.png

As you can see in the above image, we have just added a data-toggle=”validator” line to the form which automatically wraps the form with the plugin. The above form is the Create New Password form which checks whether the new password and the password entered again are matching or not. If not then the line ,

data-match="#new_password"

checks it with the password in the new_password field and gives an error on the page to the client defined by the following,

data-error="Passwords do not match

Thus adding such checks to the form becomes very simple rather than using jQuery for it. Similarly it was important to add validation to the Register page.

 <input type="email" name="email" class="form-control" id="email" 
 placeholder="Email" data-remote="{{ url_for('admin.check_duplicate_email') }}"
 data-remote-error="Email Address already exists" required="">

This is the validation for the email field. Apart from checking whether the text entered by the user is an email id or not it was also important to check whether the email entered by the user exists in the database or not.

data-remote="{{ url_for('admin.check_duplicate_email')

This line calls a view function to check whether the email entered by the user is duplicate or unique. Here is the function

2.png

This function takes the email value from the request.args and then performs a simple check in the db to check for duplicate email. If the user doesnt exist then the validator receives a simple string “200 OK”. If the error is 404 then it gives the error defined by,

data-remote-error="Email Address already exists"

However this error only for the data-remote part. If the error is something else then it is handled by ,

class="help-block with-errors"

Thus without the hassle of using Ajax and JQuery we can easily add validation to the forms using Bootstrap Validator.

Writing tests for Open-Event

As our application and code base increased it became necessary to write tests for each functionality. Earlier we had tests only for basic functionalities like creating an event, editing an event, but then it is very important and also beneficial if we have tests for each and every small functionality. Hence we started writing proper tests. We divivded the tests into three folder

  • API
  • Functionality
  • Views

All the API related tests were in the above one whereas the basic functionalities were in the second one. The last folder was further divided into three parts

  • Admin Tests
  • Super-Admin Tests
  • Guest Pages

We had to test each and every functionality. For example let us look at the test file for the events. It looks like this:

class TestEvents(OpenEventViewTestCase):
    def test_events_list(self):
        with app.test_request_context():
            url = url_for('events.index_view')
            rv = self.app.get(url, follow_redirects=True)

            self.assertTrue("Manage Events" in rv.data, msg=rv.data)

    def test_events_create(self):
        with app.test_request_context():
            url = url_for('events.create_view')
            rv = self.app.get(url, follow_redirects=True)
            self.assertTrue("Create Event" in rv.data, msg=rv.data)

    def test_events_create_post(self):
        with app.test_request_context():
            custom_forms = ObjectMother.get_custom_form()
            url = url_for('events.create_view')
            data = POST_EVENT_DATA.copy()
            del data['copyright']
            data['start_date'] = '07/04/2016'
            data['start_time'] = '19:00'
            data['end_date'] = '07/04/2016'
            data['end_time'] = '22:00'
            data['custom_form[name]'] = ['session_form', 'speaker_form']
            data['custom_form[value]'] = [custom_forms.session_form, custom_forms.speaker_form]
            rv = self.app.post(url, follow_redirects=True, buffered=True, content_type='multipart/form-data',
                               data=data)
            self.assertTrue(POST_EVENT_DATA['name'] in rv.data, msg=rv.data)

    def test_events_create_post_publish(self):
        with app.test_request_context():
            url = url_for('events.create_view')
            data = POST_EVENT_DATA.copy()
            del data['copyright']
            data['start_date'] = '07/04/2016'
            data['start_time'] = '19:00'
            data['end_date'] = '07/04/2016'
            data['end_time'] = '22:00'
            data['state'] = 'Published'
            rv = self.app.post(url, follow_redirects=True, buffered=True, content_type='multipart/form-data',
                               data=data)
            self.assertTrue('unpublish' in rv.data, msg=rv.data)

    def test_events_create_post_publish_without_location_attempt(self):
        with app.test_request_context():
            custom_forms = ObjectMother.get_custom_form()
            url = url_for('events.create_view')
            data = POST_EVENT_DATA.copy()
            del data['copyright']
            data['start_date'] = '07/04/2016'
            data['start_time'] = '19:00'
            data['end_date'] = '07/04/2016'
            data['end_time'] = '22:00'
            data['location_name'] = ''
            data['state'] = u'Published'
            data['custom_form[name]'] = ['session_form', 'speaker_form']
            data['custom_form[value]'] = [custom_forms.session_form, custom_forms.speaker_form]
            rv = self.app.post(url, follow_redirects=True, buffered=True, content_type='multipart/form-data',
                               data=data)
            self.assertTrue('To publish your event please review the highlighted fields below' in rv.data, msg=rv.data)

    def test_events_edit(self):
        with app.test_request_context():
            event = ObjectMother.get_event()
            save_to_db(event, "Event saved")
            custom_forms = ObjectMother.get_custom_form(event.id)
            save_to_db(custom_forms, "Custom forms saved")
            url = url_for('events.edit_view', event_id=event.id)
            data = POST_EVENT_DATA.copy()
            del data['copyright']
            data['name'] = 'EditTestName'
            data['start_date'] = '07/04/2016'
            data['start_time'] = '19:00'
            data['end_date'] = '07/04/2016'
            data['end_time'] = '22:00'
            data['custom_form[name]'] = ['session_form', 'speaker_form']
            data['custom_form[value]'] = [custom_forms.session_form, custom_forms.speaker_form]
            rv = self.app.post(url, follow_redirects=True, buffered=True, content_type='multipart/form-data',
                               data=data)
            self.assertTrue('EditTestName' in rv.data, msg=rv.data)

    def test_event_view(self):
        with app.test_request_context():
            event = ObjectMother.get_event()
            save_to_db(event, "Event saved")
            url = url_for('events.details_view', event_id=event.id)
            rv = self.app.get(url, follow_redirects=True)
            self.assertTrue("event1" in rv.data, msg=rv.data)
            microlocation = ObjectMother.get_microlocation(event_id=event.id)
            track = ObjectMother.get_track(event_id=event.id)
            cfs = ObjectMother.get_cfs(event_id=event.id)
            save_to_db(track, "Track saved")
            save_to_db(microlocation, "Microlocation saved")
            save_to_db(cfs, "Call for speakers saved")
            rv = self.app.get(url, follow_redirects=True)
            self.assertTrue("event1" in rv.data, msg=rv.data)

    def test_event_publish(self):
        with app.test_request_context():
            event = ObjectMother.get_event()
            save_to_db(event, "Event saved")
            url = url_for('events.publish_event', event_id=event.id)
            rv = self.app.get(url, follow_redirects=True)
            event = DataGetter.get_event(event.id)
            self.assertEqual("Published", event.state, msg=event.state)

    def test_event_unpublish(self):
        with app.test_request_context():
            event = ObjectMother.get_event()
            event.state = "Published"
            save_to_db(event, "Event saved")
            url = url_for('events.unpublish_event', event_id=event.id)
            rv = self.app.get(url, follow_redirects=True)
            event = DataGetter.get_event(event.id)
            self.assertEqual("Draft", event.state, msg=event.state)

    def test_event_delete(self):
        with app.test_request_context():
            event = ObjectMother.get_event()
            save_to_db(event, "Event saved")
            url = url_for('events.trash_view', event_id=event.id)
            rv = self.app.get(url, follow_redirects=True)
            self.assertTrue("Your event has been deleted" in rv.data, msg=rv.data)

    def test_event_copy(self):
        with app.test_request_context():
            event = ObjectMother.get_event()
            save_to_db(event, "Event saved")
            url = url_for('events.copy_event', event_id=event.id)
            rv = self.app.get(url, follow_redirects=True)
            self.assertTrue("Copy of event1" in rv.data, msg=rv.data)

if __name__ == '__main__':
    unittest.main()

So this is the test file for the event part. As you can see we have tests for each and every small functionality

  1. test_events_list : Tests the list of events
  2. test_events_create: Tests whether the event creation page is displayed
  3. test_events_create_post: Tests whether the event is created on doing a POST
  4. test_events_create_post_publish : Tests whether the event is published on doing a POST through Publish button
  5. test_events_copy: Tests whether the event is copied properly or not

Thus each functionality related to an event is tested properly. Similarly not just for events but also for the other services like sessions:

import unittest

from tests.api.utils_post_data import POST_SESSION_DATA, POST_SPEAKER_DATA
from tests.object_mother import ObjectMother
from open_event import current_app as app
from open_event.helpers.data import save_to_db
from flask import url_for

from tests.views.view_test_case import OpenEventViewTestCase


class TestSessionApi(OpenEventViewTestCase):

    def test_sessions_list(self):
        with app.test_request_context():
            event = ObjectMother.get_event()
            save_to_db(event, "Event saved")
            session = ObjectMother.get_session(event.id)
            save_to_db(session, "Session Saved")
            url = url_for('event_sessions.index_view', event_id=event.id, session_id=session.id)
            rv = self.app.get(url, follow_redirects=True)
            self.assertTrue("Sessions" in rv.data, msg=rv.data)
            self.assertTrue("test" in rv.data, msg=rv.data)

    def test_session_create(self):
        with app.test_request_context():
            event = ObjectMother.get_event()
            save_to_db(event, "Event saved")
            custom_form = ObjectMother.get_custom_form(event.id)
            save_to_db(custom_form, "Custom form saved")
            url = url_for('event_sessions.create_view', event_id=event.id)
            rv = self.app.get(url, follow_redirects=True)
            self.assertTrue("Create Session" in rv.data, msg=rv.data)

    def test_session_create_post(self):
        with app.test_request_context():
            event = ObjectMother.get_event()
            save_to_db(event, "Event saved")
            custom_form = ObjectMother.get_custom_form(event.id)
            save_to_db(custom_form, "Custom form saved")
            data = POST_SESSION_DATA
            data.update(POST_SPEAKER_DATA)
            url = url_for('event_sessions.create_view', event_id=event.id)
            rv = self.app.post(url, follow_redirects=True, buffered=True, content_type='multipart/form-data', data=data)
            self.assertTrue(data['title'] in rv.data, msg=rv.data)

    def test_session_edit(self):
        with app.test_request_context():
            event = ObjectMother.get_event()
            save_to_db(event, "Event saved")
            custom_form = ObjectMother.get_custom_form(event.id)
            save_to_db(custom_form, "Custom form saved")
            session = ObjectMother.get_session(event.id)
            save_to_db(session, "Session saved")
            url = url_for('event_sessions.edit_view', event_id=event.id, session_id=session.id)
            rv = self.app.get(url, follow_redirects=True)
            self.assertTrue("Edit Session" in rv.data, msg=rv.data)

    def test_session_edit_post(self):
        with app.test_request_context():
            event = ObjectMother.get_event()
            save_to_db(event, "Event saved")
            custom_form = ObjectMother.get_custom_form(event.id)
            save_to_db(custom_form, "Custom form saved")
            session = ObjectMother.get_session(event.id)
            save_to_db(session, "Session saved")
            data = POST_SESSION_DATA
            data['title'] = 'TestSession2'
            url = url_for('event_sessions.edit_view', event_id=event.id, session_id=session.id)
            rv = self.app.post(url, follow_redirects=True, buffered=True, content_type='multipart/form-data', data=data)
            self.assertTrue("TestSession2" in rv.data, msg=rv.data)

    def test_session_accept(self):
        with app.test_request_context():
            session = ObjectMother.get_session()
            save_to_db(session, "Session Saved")
            url = url_for('event_sessions.accept_session', event_id=1, session_id=session.id)
            rv = self.app.get(url, follow_redirects=True)
            self.assertTrue("The session has been accepted" in rv.data, msg=rv.data)

    def test_session_reject(self):
        with app.test_request_context():
            session = ObjectMother.get_session()
            save_to_db(session, "Session Saved")
            url = url_for('event_sessions.reject_session', event_id=1, session_id=session.id)
            rv = self.app.get(url, follow_redirects=True)
            self.assertTrue("The session has been rejected" in rv.data, msg=rv.data)

    def test_session_delete(self):
        with app.test_request_context():
            session = ObjectMother.get_session()
            save_to_db(session, "Session Saved")
            url = url_for('event_sessions.delete_session', event_id=1, session_id=session.id)
            rv = self.app.get(url, follow_redirects=True)
            self.assertTrue("deleted" in rv.data, msg=rv.data)

    def test_session_view(self):
        with app.test_request_context():
            event = ObjectMother.get_event()
            save_to_db(event)
            session = ObjectMother.get_session()
            session.event_id = event.id
            save_to_db(session, "Session Saved")
            url = url_for('event_sessions.session_display_view', event_id=event.id, session_id=session.id)
            rv = self.app.get(url, follow_redirects=True)
            self.assertTrue("Short abstract" in rv.data, msg=rv.data)

    def test_wrong_form_config(self):
        with app.test_request_context():
            event = ObjectMother.get_event()
            save_to_db(event, "Event saved")
            url = url_for('event_sessions.create_view', event_id=event.id)
            rv = self.app.get(url, follow_redirects=True)
            self.assertFalse("incorrectly configured" in rv.data, msg=rv.data)

if __name__ == '__main__':
    unittest.main()

We see that there are tests for each functionality of the sessions. However these tests were simple to write. However there was problem in one aspect of writing tests. In the Event creation wizard there are steps where the sponsors, tracks, rooms are dynamically added to the event. How then should we test them. I wrote the test for the creation of sponsors in step -2

def test_events_create_post(self):
    with app.test_request_context():
        custom_forms = ObjectMother.get_custom_form()
        url = url_for('events.create_view')
        data = POST_EVENT_DATA.copy()
        del data['copyright']
        data['sponsors[name]'] = ['Sponsor 1', 'Sponsor 2']
        data['sponsors[type]'] = ['Gold', 'Silver']
        data['sponsors[url]'] = ["", ""]
        data['sponsors[description]'] = ["", ""]
        data['sponsors[level]'] = ["", ""]
        data['start_date'] = '07/04/2016'
        data['start_time'] = '19:00'
        data['end_date'] = '07/04/2016'
        data['end_time'] = '22:00'
        data['custom_form[name]'] = ['session_form', 'speaker_form']
        data['custom_form[value]'] = [custom_forms.session_form, custom_forms.speaker_form]
        data = ImmutableMultiDict(data)
        rv = self.app.post(url, follow_redirects=True, buffered=True, content_type='multipart/form-data',
                           data=data)
        self.assertTrue(POST_EVENT_DATA['name'] in rv.data, msg=rv.data)

        rv2 = self.app.get(url_for('events.details_view', event_id=1))
        self.assertTrue(data['sponsors[name]'] in rv2.data, msg=rv2.data)

Here on importing the data dict I dynamically add two sponsors to the dict. After that I convert the dict to an Immutablemulti-dict so that the multiple sponsors can be displayed. Then I pass this dict to the event creation view via a POST request and check whether the two sponsors are present in the details page or not.

Thus our test system is developed and improving. Still as we develop more functionalities we will write more tests 🙂

 

Implementing Admin Trash in Open Event

So last week I had the task of implementing a trash system for the Admin. It was observed that sometimes a user may delete an item and then realize that the item needs to be restores. Thus a trash system works well in this case. Presently the items that are being moved to the trash are:

  • Deleted Users
  • Deleted Events
  • Deleted Sessions

So it works like this. I added a column in_trash to the tables User, Event and Sessions to mark whether the item is in the trash or not

in_trash = db.Column(db.Boolean, default=False)

So depending on whether the value is True or False the item will be in the trash of the admin. Thus for a normal user on deleting an event, user or session a message would flash that the item is deleted and the item would not be shown in the table list of the user. However it would not be deleted from the database.

trash4.png

trash5.png

Thus for the user the item is deleted. The item’s in_trash property is set to True and it gets moved to the trash. The items are displayed in the “Deleted Items” section of the Admin panel

trash1trash2trash3

The items deleted are displayed in the trash and as soon as they deleted in the trash they are deleted from the database permanently. A message will flash for the Admin when it is deleted

trash11

trash10.png

Thus the trash is implemented. 🙂

Two more things are left:

  • To restore items from trash
  • To automatically delete the items in trash after an inactivity of 30 days

This will soon be implemented 🙂

Open-Event Permissions System and integrating it with decorators

All the large scale applications require a permissions system. Thus we also implemented a permissions system in our open-event organization server. It consists of certain pre-decided roles:

  1. Super-Admin
  2. Admin
  3. Organizer
  4. Co organizer
  5. Track organizer
  6. Anonymous user

Now we had to decide the permissions which each role would have. Hence we created a documentation regarding what URLs can be accessed by each role. We developed a list of services which the roles could use their permissions to access:

  1. Tracks
  2. Microlocations
  3. Speakers
  4. Sessions
  5. Sponsors

Thus the final step was to implement the permissions system to the appropriate views or URLs. Here comes the power of Flask decorators . I created a individual decorators @is_organizer, @is_admin, @is_super_admin etc… to check the respective roles. I created one main decorator @can_access to see whether the role can access the particular URL or view function

decorator

So in the above decorator I have simply take in the url and check whether it has ‘create’, ‘edit’ or ‘delete’ words in it. Depending on that the control goes in the particular IF statement. Now once it is decided what operation is being performed it checks what service is being accessed by the user. For example: if the operation is edit then it will check whether the service being edited is an event, session, sponsor etc…

Similar checks are performed by each operation. A check is performed of the request.url to see whether the string for that service is present in it. After it knows what service is being accessed its just a matter of using the CRUD functions of user table to check if the role accessing the resource has the requested permission using the functions:

  1. user.can_create()
  2. user.can_read()
  3. user.can_update()
  4. user.can_delete()

After this its just a matter of adding the decorator to each of the view functions and the system is implemented.  🙂

File Uploading in Flask

Last week I took up an issue of adding upload functionality to the open-event server. I had to implement the upload in 3 places – one in the sponsor table to upload images, another in the user profile page and the third is to upload slides in the session form. However the basic function behind it remains the same. File upload makes use of the inbuilt werkzeug FileStorage class. A code snippet will help understand how it works:

Capture.PNG

So  on selecting the file it gets stored in the request.files object. It is temporarily stored in the FileStorage in werkzeug. Now we access the file’s name using request.files[‘files’] and then using the inbuilt save()  function in flask it gets saved to the folder specified by us. There are some frameworks available for file uploading in Flask but all this can be done using the standard libraries also and there is no such need of the frameworks.

However instead of storing just the direct name of the file we make use of the secure_filename of werkzeug to save it.Capture2.PNG

Thus the secure_filename stores the file in the format given in the image and makes uploading easier by converting the long url/image path to an easier one.