Open Event Server: Creating/Rebuilding Elasticsearch Index From Existing Data In a PostgreSQL DB Using Python

The Elasticsearch instance in the current Open Event Server deployment is currently just used to store the events and search through it due to limited resources.

The project uses a PostgreSQL database, this blog will focus on setting up a job to create the events index if it does not exist. If the indices exists, the job will delete all the previous the data and rebuild the events index.

Although the project uses Flask framework, the job will be in pure python so that it can run in background properly while the application continues its work. Celery is used for queueing up the aforementioned jobs. For building the job the first step would be to connect to our database:

from config import Config
import psycopg2
conn = psycopg2.connect(Config.SQLALCHEMY_DATABASE_URI)
cur = conn.cursor()

 

The next step would be to fetch all the events from the database. We will only be indexing certain attributes of the event which will be useful in search. Rest of them are not stored in the index. The code given below will fetch us a collection of tuples containing the attributes mentioned in the code:

cur.execute(
       "SELECT id, name, description, searchable_location_name, organizer_name, organizer_description FROM events WHERE state = 'published' and deleted_at is NULL ;")
   events = cur.fetchall()

 

We will be using the the bulk API, which is significantly fast as compared to adding an event one by one via the API. Elasticsearch-py, the official python client for elasticsearch provides the necessary functionality to work with the bulk API of elasticsearch. The helpers present in the client enable us to use generator expressions to insert the data via the bulk API. The generator expression for events will be as follows:

event_data = ({'_type': 'event',
                  '_index': 'events',
                  '_id': event_[0],
                  'name': event_[1],
                  'description': event_[2] or None,
                  'searchable_location_name': event_[3] or None,
                  'organizer_name': event_[4] or None,
                  'organizer_description': event_[5] or None}
                 for event_ in events)

 

We will now delete the events index if it exists. The the event index will be recreated. The generator expression obtained above will be passed to the bulk API helper and the event index will repopulated. The complete code for the function will now be as follows:

 

@celery.task(name='rebuild.events.elasticsearch')
def cron_rebuild_events_elasticsearch():
   """
   Re-inserts all eligible events into elasticsearch
   :return:
   """
   conn = psycopg2.connect(Config.SQLALCHEMY_DATABASE_URI)
   cur = conn.cursor()
   cur.execute(
       "SELECT id, name, description, searchable_location_name, organizer_name, organizer_description FROM events WHERE state = 'published' and deleted_at is NULL ;")
   events = cur.fetchall()
   event_data = ({'_type': 'event',
                  '_index': 'events',
                  '_id': event_[0],
                  'name': event_[1],
                  'description': event_[2] or None,
                  'searchable_location_name': event_[3] or None,
                  'organizer_name': event_[4] or None,
                  'organizer_description': event_[5] or None}
                 for event_ in events)
   es_store.indices.delete('events')
   es_store.indices.create('events')
   abc = helpers.bulk(es_store, event_data)

 

Currently we run this job on each week and also on each new deployment. Rebuilding the index is very important as some records may not be indexed when the continuous sync is taking place.

To know more about it please visit https://gocardless.com/blog/syncing-postgres-to-elasticsearch-lessons-learned/

Related links:

Open Event API Server: Implementing FAQ Types

In the Open Event Server, there was a long standing request of the users to enable the event organisers to create a FAQ section.

The API of the FAQ section was implemented subsequently. The FAQ API allowed the user to specify the following request schema

{
 "data": {
   "type": "faq",
   "relationships": {
     "event": {
       "data": {
         "type": "event",
         "id": "1"
       }
     }
   },
   "attributes": {
     "question": "Sample Question",
     "answer": "Sample Answer"
   }
 }
}

 

But, what if the user wanted to group certain questions under a specific category. There was no solution in the FAQ API for that. So a new API, FAQ-Types was created.

Why make a separate API for it?

Another question that arose while designing the FAQ-Types API was whether it was necessary to add a separate API for it or not. Consider that a type attribute was simply added to the FAQ API itself. It would mean the client would have to specify the type of the FAQ record every time a new record is being created for the same. This would mean trusting that the user will always enter the same spelling for questions falling under the same type. The user cannot be trusted on this front. Thus the separate API made sure that the types remain controlled and multiple entries for the same type are not there.

Helps in handling large number of records:

Another concern was what if there were a large number of FAQ records under the same FAQ-Type. Entering the type for each of those questions would be cumbersome for the user. The FAQ-Type would also overcome this problem

Following is the request schema for the FAQ-Types API

{
 "data": {
   "attributes": {
     "name": "abc"
   },
   "type": "faq-type",
   "relationships": {
     "event": {
       "data": {
         "id": "1",
         "type": "event"
       }
     }
   }
 }
}

 

Additionally:

  • FAQ to FAQ-type is a many to one relation.
  • A single FAQ can only belong to one Type
  • The FAQ-type relationship will be optional, if the user wants different sections, he/she can add it ,if not, it’s the user’s choice.

Related links

Open Event Server: Getting The Identity From The Expired JWT Token In Flask-JWT

The Open Event Server uses JWT based authentication, where JWT stands for JSON Web Token. JSON Web Tokens are an open industry standard RFC 7519 method for representing claims securely between two parties. [source: https://jwt.io/]

Flask-JWT is being used for the JWT-based authentication in the project. Flask-JWT makes it easy to use JWT based authentication in flask, while on its core it still used PyJWT.

To get the identity when a JWT token is present in the request’s Authentication header , the current_identity proxy of Flask-JWT can be used as follows:

@app.route('/example')
@jwt_required()
def example():
   return '%s' % current_identity

 

Note that it will only be set in the context of function decorated by jwt_required(). The problem with the current_identity proxy when using jwt_required is that the token has to be active, the identity of an expired token cannot be fetched by this function.

So why not write a function on our own to do the same. A JWT token is divided into three segments. JSON Web Tokens consist of three parts separated by dots (.), which are:

  • Header
  • Payload
  • Signature

The first step would be to get the payload, that can be done as follows:

token_second_segment = _default_request_handler().split('.')[1]

 

The payload obtained above would still be in form of JSON, it can be converted into a dict as follows:

payload = json.loads(token_second_segment.decode('base64'))

 

The identity can now be found in the payload as payload[‘identity’]. We can get the actual user from the paylaod as follows:

def jwt_identity(payload):
   """
   Jwt helper function
   :param payload:
   :return:
   """
   return User.query.get(payload['identity'])

 

Our final function will now be something like:

def get_identity():
   """
   To be used only if identity for expired tokens is required, otherwise use current_identity from flask_jwt
   :return:
   """
   token_second_segment = _default_request_handler().split('.')[1]
   missing_padding = len(token_second_segment) % 4
   payload = json.loads(token_second_segment.decode('base64'))
   user = jwt_identity(payload)
   return user

 

But after using this function for sometime, you will notice that for certain tokens, the system will raise an error saying that the JWT token is missing padding. The JWT payload is base64 encoded, and it requires the payload string to be a multiple of four. If the string is not a multiple of four, the remaining spaces can pe padded with extra =(equal to) signs. And since Python 2.7’s .decode doesn’t do that by default, we can accomplish that as follows:

missing_padding = len(token_second_segment) % 4

# ensures the string is correctly padded to be a multiple of 4
if missing_padding != 0:
   token_second_segment += b'=' * (4 - missing_padding)

 

Related links:

Open Event Server: Forming jsonapi Compatible Error Responses In flask-rest-jsonapi Decorators

From the jsonapi documentation:

Error objects provide additional information about problems encountered while performing an operation. Error objects MUST be returned as an array keyed by errors in the top level of a JSON API document.

To return jsonapi compatible error objects in flask-rest-jsonapi, one must raise an exception and an appropriate error message will be shown depending on the type of exception specified.

Following is an example of how to raise a jsonapi compatible error

try:
               self.session.query(Person).filter_by(id=view_kwargs['id']).one()
           except NoResultFound:
               raise ObjectNotFound({'parameter': 'id'}, "Person: {} not found".format(view_kwargs['id']))

 

But the above method of raising an exception fails when working with decorators in flask-rest-jsonapi. Taking inspiration from the JsonApiException class of flask-rest-jsonapi itself, we’ll be building a custom class which can formulate jsonapi compatible error message and we can just simply return them by using make_response from flask.

In our basic class definition, we’ll define a default title and status code in case none of them is provided by the user. The default status code will be 500. Following is the code for the same:

class ErrorResponse:
   """
   Parent ErrorResponse class for handling json-api compliant errors.
   Inspired by the JsonApiException class of `flask-rest-jsonapi` itself
   """
   title = 'Unknown error'
   status = 500
   headers = {'Content-Type': 'application/vnd.api+json'}


We will be accepting the following four parameters for the initialization of an object of the said class:

  • source: the source of the error in the request document
  • detail: Any details about the error
  • title: Title for the error
  • Status: HTTP status for the error

Following is the initialisation function for the same:

def __init__(self, source, detail, title=None, status=None):
       """Initialize a jsonapi ErrorResponse Object

       :param dict source: the source of the error
       :param str detail: the detail of the error
       """
       self.source = source
       self.detail = detail
       if title is not None:
           self.title = title
       if status is not None:
           self.status = status

 

We’ll be using the jsonapi_errors module to format all of these parameters into jsonapi error objects:

  def respond(self):
       """
       :return: a jsonapi compliant response object
       """
       dict_ = self.to_dict()
       return make_response(json.dumps(jsonapi_errors([dict_])), self.status, self.headers)

 

Related links:

Checking For Multiple Migration Heads In Open Event server

While working on Open Event server, lots of db refactor were made in the first phase. That means a multiple contributors working on the refactor of the same database.

The open event server uses SQLAlchemy as its ORM. SQLAlchemy is the Python SQL toolkit and Object Relational Mapper that gives application developers the full power and flexibility of SQL. It provides a full suite of well known enterprise-level persistence patterns, designed for efficient and high-performing database access, adapted into a simple and Pythonic domain language.

While for database migrations, the server uses Alembic. Alembic is a lightweight database migration tool for usage with the SQLAlchemy Database Toolkit for Python. Alembic provides for the creation, management, and invocation of change management scripts for a relational database, using SQLAlchemy as the underlying engine.

The issue the project collaborators faced the most when handling database migrations was when multiple PRs were merged with migration files that pointed to different heads. In such cases, Alembic would raise a multiple heads error, after the deployment has been made. There were no tests to ensure that this doesn’t happen.

The number of migration heads can be found by the following command

python manage.py db heads

 

The above command prints the identifiers of the migration heads, each on a different line. We can count the no. of lines outputted by the above command with the help of wc (word count). The result can be stored as a variable as follows:

lines=`python manage.py db heads | wc | awk '{print $1}'`

 

If the no. of lines is one, it means there is only one head and our test should pass. If the head is not one, the test should fail. Following is the test script to do that:

lines=`python manage.py db heads | wc | awk '{print $1}'`

if [ $lines -ne 1 ]
then
   exit 1
else
   exit 0
fi

 

Related links:

Checking Whether Migrations Are Up To Date With The Sqlalchemy Models In The Open Event Server

In the Open Event Server, in the pull requests, if there is some change in the sqlalchemy model, sometimes proper migrations for the same are missed in the PR.

The first approach to check whether the migrations were up to date in the database was with the following health check function:

from subprocess import check_output
def health_check_migrations():
   """
   Checks whether database is up to date with migrations, assumes there is a single migration head
   :return:
   """
   head = check_output(["python", "manage.py", "db", "heads"]).split(" ")[0]
   
   if head == version_num:
       return True, 'database up to date with migrations'
   return False, 'database out of date with migrations'

 

In the above function, we get the head according to the migration files as following:

head = check_output(["python", "manage.py", "db", "heads"]).split(" ")[0]


The table alembic_version contains the latest alembic revision to which the database was actually upgraded. We can get this revision from the following line:

version_num = (db.session.execute('SELECT version_num from alembic_version').fetchone())['version_num']

 

Then we compare both of the given heads and return a proper tuple based on the comparison output.While this method was pretty fast, there was a drawback in this approach. If the user forgets to generate the migration files for the the changes done in the sqlalchemy model, this approach will fail to raise a failure status in the health check.

To overcome this drawback, all the sqlalchemy models were fetched automatically and simple sqlalchemy select queries were made to check whether the migrations were up to date.

Remember that a raw SQL query will not serve our purpose in this case as you’d have to specify the columns explicitly in the query. But in the case of a sqlalchemy query, it generates a SQL query based on the fields defined in the db model, so if migrations are missing to incorporate the said change proper error will be raised.

We can accomplish this from the following function:

def health_check_migrations():
   """
   Checks whether database is up to date with migrations by performing a select query on each model
   :return:
   """
   # Get all the models in the db, all models should have a explicit __tablename__
   classes, models, table_names = [], [], []
   # noinspection PyProtectedMember
   for class_ in db.Model._decl_class_registry.values():
       try:
           table_names.append(class_.__tablename__)
           classes.append(class_)
       except:
           pass
   for table in db.metadata.tables.items():
       if table[0] in table_names:
           models.append(classes[table_names.index(table[0])])

   for model in models:
       try:
           db.session.query(model).first()
       except:
           return False, '{} model out of date with migrations'.format(model)
   return True, 'database up to date with migrations'

 

In the above code, we automatically get all the models and tables present in the database. Then for each model we try a simple SELECT query which returns the first row found. If there is any error in doing so, False, ‘{} model out of date with migrations’.format(model) is returned, so as to ensure a failure status in health checks.

Related:

Implementing Health Check Endpoint in Open Event Server

A health check endpoint was required in the Open Event Server be used by Kubernetes to know when the web instance is ready to receive requests.

Following are the checks that were our primary focus for health checks:

  • Connection to the database.
  • Ensure sql-alchemy models are inline with the migrations.
  • Connection to celery workers.
  • Connection to redis instance.

Runscope/healthcheck seemed like the way to go for the same. Healthcheck wraps a Flask app object and adds a way to write simple health-check functions that can be used to monitor your application. It’s useful for asserting that your dependencies are up and running and your application can respond to HTTP requests. The Healthcheck functions are exposed via a user defined flask route so you can use an external monitoring application (monit, nagios, Runscope, etc.) to check the status and uptime of your application.

Health check endpoint was implemented at /health-check as following:

from healthcheck import HealthCheck
health = HealthCheck(current_app, "/health-check")

 

Following is the function for checking the connection to the database:

def health_check_db():
   """
   Check health status of db
   :return:
   """
   try:
       db.session.execute('SELECT 1')
       return True, 'database ok'
   except:
       sentry.captureException()
       return False, 'Error connecting to database'

 

Check functions take no arguments and should return a tuple of (bool, str). The boolean is whether or not the check passed. The message is any string or output that should be rendered for this check. Useful for error messages/debugging.

The above function executes a query on the database to check whether it is connected properly. If the query runs successfully, it returns a tuple True, ‘database ok’. sentry.captureException() makes sure that the sentry instance receives a proper exception event with all the information about the exception. If there is an error connecting to the database, the exception will be thrown. The tuple returned in this case will be return False, ‘Error connecting to database’.

Finally to add this to the endpoint:

health.add_check(health_check_db)

Following is the response for a successful health check:

{
   "status": "success",
   "timestamp": 1500915121.52474,
   "hostname": "shubham",
   "results": [
       {
           "output": "database ok",
           "checker": "health_check_db",
           "expires": 1500915148.524729,
           "passed": true,
           "timestamp": 1500915121.524729
       }
   ]
}

If the database is not connected the following error will be shown:

{
           "output": "Error connecting to database",
           "checker": "health_check_db",
           "expires": 1500965798.307425,
           "passed": false,
           "timestamp": 1500965789.307425
}

Related:

Implementing ETags Support In flask-rest-jsonapi For Open Event Server

In the Open Event Server Project, the client apps required to implement ETags support so that they could efficiently consume the API.

What is an ETag ?

An entity tag (ETag) is an HTTP header used for Web cache validation and conditional requests from browsers for resources.

What is the need for an ETag ?

  • Clients can make use of this and request complete data if and only if the data has changed else use their local cache.
  • This can be used to ensure concurrency in the case of multiple clients trying to modify the same data at the same time.

How to implement ETags in the API framework ?

To implement ETags in the API framework, changes need to be done in the dispatch_request function of Resource class located at resource.py at the root of the framework.

A config variable will also be added in order to turn ETags on and off. You can name anything you want, but we went ahead with just ETAG. Now the first thing we should do is calculate the ETag hash from the original response. The response variable can be grabbed in dispatch_request and hashing can be performed on it as follows:

etag = hashlib.sha1(resp.get_data()).hexdigest()
resp.headers['ETag'] = etag # return ETag in response headers

Why did we use SHA-1 ?

In the above mentioned lines of code, you will notice that we are using SHA-1 for hashing purposes. SHA-1 is known to have collisions, so why use it ? In ETags we are not storing the hashes anywhere but are returning the ETag in the response header directly. So there is a very less probability of collision even if we used MD5, so using SHA-1 won’t hurt much 😉

Till now, the above code enables to return an ETag but that is of no use if we do not support request headers If-Match and If-None-Match. Both of these headers can be obtained from the request as follows:

if_match = request.headers.get('If-Match')
if_none_match = request.headers.get('If-None-Match')

 

For both If-Match and If-None-Match request headers, the system will accept a comma separated list of Etags. This can be accomplished as follows:

etag_list = [tag.strip() for tag in if_match.split(',')]

 

For If-Match, the response is returned only if the ETag of the current response matches any of the comma-separated ETags in the If-Match header. If none of the given ETags match, a 412 Precondition Failed status code will be returned. This can be implemented with a check as follows:

if if_match:
   etag_list = [tag.strip() for tag in if_match.split(',')]
   if etag not in etag_list and '*' not in etag_list:
       exc = PreconditionFailed({'pointer': ''}, 'Precondition failed')
       return make_response(json.dumps(jsonapi_errors([exc.to_dict()])),
                            exc.status,
                            headers)

 

For If-None-Match, the response is returned only if the ETag of the current response does not match any of the comma-separated ETags in the If-Match header. If none of the given ETags match, a 304 Not Modified status code will be returned as follows:

elif if_none_match:
   etag_list = [tag.strip() for tag in if_none_match.split(',')]
   if etag in etag_list or '*' in etag_list:
       exc = NotModified({'pointer': ''}, 'Resource not modified')
       return make_response(json.dumps(jsonapi_errors([exc.to_dict()])),
                            exc.status,
                            headers)

 

Related Links:

Supporting Dasherized Attributes and Query Params in flask-rest jsonapi for Open Event Server

In the Open Event API Server project attributes of the API are dasherized.

What was the need for dasherizing the attributes in the API ?

All the attributes in our database models are separated by underscores i.e first name would be stored as first_name. But most of the API client implementations support dasherized attributes by default. In order to attract third party client implementations in the future and making the API easy to set up for them was the primary reason behind this decision.Also to quote the official json-api spec recommendation for the same:

Member names SHOULD contain only the characters “a-z” (U+0061 to U+007A), “0-9” (U+0030 to U+0039), and the hyphen minus (U+002D HYPHEN-MINUS, “-“) as separator between multiple words.

Note: The dasherized version for first_name will be first-name.

flask-rest-jsonapi is the API framework used by the project. We were able to dasherize the API responses and requests by adding inflect=dasherize to each API schema, where dasherize is the following function:

def dasherize(text):
   return text.replace('_', '-')

 

flask-rest-jsonapi also provides powerful features like the following through query params:

But we observed that the query params were not being dasherized which rendered the above awesome features useless 🙁 . The reason for this was that flask-rest-jsonapi took the query params as-is and search for them in the API schema. As Python variable names cannot contain a dash, naming the attributes with a dash in the internal API schema was out of the question.

For adding dasherizing support to the query params, change in the QueryStringManager located at querystring.py of the framework root are required. A config variable named DASHERIZE_APIwas added to turn this feature on and off.

Following are the changes required for dasherizing query params:

For Sparse Fieldsets in the fields function, replace the following line:

result[key] = [value]
with
if current_app.config['DASHERIZE_API'] is True:
    result[key] = [value.replace('-', '_')]
else:
    result[key] = [value]

 

For sorting, in the sorting function, replace the following line:

field = sort_field.replace('-', '')

with

if current_app.config['DASHERIZE_API'] is True:
   field = sort_field[0].replace('-', '') + sort_field[1:].replace('-', '_')
else:
   field = sort_field[0].replace('-', '') + sort_field[1:]

 

For Include related objects, in include function, replace the following line:

return include_param.split(',') if include_param else []

with

if include_param:
   param_results = []
   for param in include_param.split(','):
       if current_app.config['DASHERIZE_API'] is True:
           param = param.replace('-', '_')
       param_results.append(param)
   return param_results
return []

Related links:

Modifying flask-rest-jsonapi Exception Handling in Open Event Server to Enable Support for Sentry

In Open Event Server Project, Sentry support was enabled for the project. So first of all,

What is Sentry ?

Sentry provides open source error tracking that shows you every crash in your stack as it happens, with the details needed to prioritize, identify, reproduce, and fix each issue.

The basic error tracking can be enabled with the following two simple lines,

from raven.contrib.flask import Sentry
sentry = Sentry(app, dsn='https://<key>:<secret>@sentry.io/<project>')

 

But after sometime, it was noticed that app-related errors were not being caught in Sentry, while migration related errors were being caught. This meant that sentry was functioning properly in the app, but it was having some trouble in identifying uncaught exceptions.

After a lot of digging, it came to knowledge that the api framework, flask-rest-jsonapi caught all unknown exceptions while dispatching the request. After catching the exceptions, it gave a jsonapi error with status 500 in return. Following is the code responsible for that:

except Exception as e:
           if current_app.config['DEBUG'] is True:
               raise e
           exc = JsonApiException('', 'Unknown error')
           return make_response(json.dumps(jsonapi_errors([exc.to_dict()])),
                                exc.status,
                                headers)

 

We now had to let these exceptions go uncaught and that required us to modify the api framework. Modifications were done in the api-framework’s exception handling as shown below

if 'API_PROPOGATE_UNCAUGHT_EXCEPTIONS' in current_app.config:
               if current_app.config['API_PROPOGATE_UNCAUGHT_EXCEPTIONS'] is True:
                   raise
           if current_app.config['DEBUG'] is True:
               raise e
           exc = JsonApiException({'pointer': ''}, 'Unknown error')
           return make_response(json.dumps(jsonapi_errors([exc.to_dict()])),
                                exc.status,
                                headers)

A config parameter named API_PROPOGATE_UNCAUGHT_EXCEPTIONS was added to the config to turn this feature on or off.

But with all this done, Sentry was able to catch and report uncaught exceptions, but the json-api spec compliant error messages on an unknown error were not being returned due to this new change. So it was decided to handle these uncaught exceptions in the app itself. Flask’s default error handlers were used to tackle this situation:

@app.errorhandler(500)
def internal_server_error(error):
   exc = JsonApiException({'pointer': ''}, 'Unknown error')
   return make_response(json.dumps(jsonapi_errors([exc.to_dict()])), exc.status,
                        {'Content-Type': 'application/vnd.api+json'})

Thus, all uncaught exceptions were now returning a proper json-api spec compliant error response.

Related links: