Open Event Server: Creating/Rebuilding Elasticsearch Index From Existing Data In a PostgreSQL DB Using Python
The Elasticsearch instance in the current Open Event Server deployment is currently just used to store the events and search through it due to limited resources.
The project uses a PostgreSQL database, this blog will focus on setting up a job to create the events index if it does not exist. If the indices exists, the job will delete all the previous the data and rebuild the events index.
Although the project uses Flask framework, the job will be in pure python so that it can run in background properly while the application continues its work. Celery is used for queueing up the aforementioned jobs. For building the job the first step would be to connect to our database:
from config import Config import psycopg2 conn = psycopg2.connect(Config.SQLALCHEMY_DATABASE_URI) cur = conn.cursor()
The next step would be to fetch all the events from the database. We will only be indexing certain attributes of the event which will be useful in search. Rest of them are not stored in the index. The code given below will fetch us a collection of tuples containing the attributes mentioned in the code:
cur.execute( "SELECT id, name, description, searchable_location_name, organizer_name, organizer_description FROM events WHERE state = 'published' and deleted_at is NULL ;") events = cur.fetchall()
We will be using the the bulk API, which is significantly fast as compared to adding an event one by one via the API. Elasticsearch-py, the official python client for elasticsearch provides the necessary functionality to work with the bulk API of elasticsearch. The helpers present in the client enable us to use generator expressions to insert the data via the bulk API. The generator expression for events will be as follows:
event_data = ({'_type': 'event', '_index': 'events', '_id': event_[0], 'name': event_[1], 'description': event_[2] or None, 'searchable_location_name': event_[3] or None, 'organizer_name': event_[4] or None, 'organizer_description': event_[5] or None} for event_ in events)
We will now delete the events index if it exists. The the event index will be recreated. The generator expression obtained above will be passed to the bulk API helper and the event index will repopulated. The complete code for the function will now be as follows:
@celery.task(name='rebuild.events.elasticsearch') def cron_rebuild_events_elasticsearch(): """ Re-inserts all eligible events into elasticsearch :return: """ conn = psycopg2.connect(Config.SQLALCHEMY_DATABASE_URI) cur = conn.cursor() cur.execute( "SELECT id, name, description, searchable_location_name, organizer_name, organizer_description FROM events WHERE state = 'published' and deleted_at is NULL ;") events = cur.fetchall() event_data = ({'_type': 'event', '_index': 'events', '_id': event_[0], 'name': event_[1], 'description': event_[2] or None, 'searchable_location_name': event_[3] or None, 'organizer_name': event_[4] or None, 'organizer_description': event_[5] or None} for event_ in events) es_store.indices.delete('events') es_store.indices.create('events') abc = helpers.bulk(es_store, event_data)
Currently we run this job on each week and also on each new deployment. Rebuilding the index is very important as some records may not be indexed when the continuous sync is taking place.
To know more about it please visit https://gocardless.com/blog/syncing-postgres-to-elasticsearch-lessons-learned/
Related links:
- Syncing Postgres to Elasticsearch, lessons learned: https://gocardless.com/blog/syncing-postgres-to-elasticsearch-lessons-learned/
- Elasticsearch Python Client: https://github.com/elastic/elasticsearch-py