Discount Codes in Open Event Server

The Open Event System allows usage of discount codes with tickets and events. This blogpost describes what types of discount codes are present and what endpoints can be used to fetch and update details.

In Open Event API Server, each event can have two types of discount codes. One is ‘event’ discount code, while the other is ‘ticket’ discount code. As the name suggests, the event discount code is an event level discount code and the ticket discount code is ticket level.

Now each event can have only one ‘event’ discount code and is accessible only to the server admin. The Open Event server admin can create, view and update the ‘event’ discount code for an event. The event discount code followsDiscountCodeEvent Schema. This schema is inherited from the parent class DiscountCodeSchemaPublic. To save the unique discount code associated with an event, the event model’s discount_code_id field is used.

The ‘ticket’ discount is accessible by the event organizer and co-organizer. Each event can have any number of ‘ticket’ discount codes. This follows the DiscountCodeTicket schema, which is also inherited from the same base class ofDiscountCodeSchemaPublic. The use of the schema is decided based on the value of the field ‘used_for’ which can have the value either ‘events’ or ‘tickets’. Both the schemas have different relationships with events and marketer respectively.

We have the following endpoints for Discount Code events and tickets:
‘/events/<int:event_id>/discount-code’
‘/events/<int:event_id>/discount-codes’

The first endpoint is based on the DiscountCodeDetail class. It returns the detail of one discount code which in this case is the event discount code associated with the event.

The second endpoint is based on the DiscountCodeList class which returns a list of discount codes associated with an event. Note that this list also includes the ‘event’ discount code, apart from all the ticket discount codes.

class DiscountCodeFactory(factory.alchemy.SQLAlchemyModelFactory):
   class Meta:
       model = DiscountCode
       sqlalchemy_session = db.session
event_id = None
user = factory.RelatedFactory(UserFactory)
user_id = 1


Since each discount code belongs to an event(either directly or through the ticket), the factory for this has event as related factory, but to check for 
/events/<int:event_id>/discount-code endpoint we first need the event and then pass the discount code id to be 1 for dredd to check this. Hence, event is not included as a related factory, but added as a different object every time a discount code object is to be used.

@hooks.before("Discount Codes > Get Discount Code Detail of an Event > Get Discount Code Detail of an Event")
def event_discount_code_get_detail(transaction):
   """
   GET /events/1/discount-code
   :param transaction:
   :return:
   """
   with stash['app'].app_context():
       discount_code = DiscountCodeFactory()
       db.session.add(discount_code)
       db.session.commit()
       event = EventFactoryBasic(discount_code_id=1)
       db.session.add(event)
       db.session.commit()


The other tests and extended documentation can be found 
here.

References:

Continue Reading

Open Event Server: Getting The Identity From The Expired JWT Token In Flask-JWT

The Open Event Server uses JWT based authentication, where JWT stands for JSON Web Token. JSON Web Tokens are an open industry standard RFC 7519 method for representing claims securely between two parties. [source: https://jwt.io/]

Flask-JWT is being used for the JWT-based authentication in the project. Flask-JWT makes it easy to use JWT based authentication in flask, while on its core it still used PyJWT.

To get the identity when a JWT token is present in the request’s Authentication header , the current_identity proxy of Flask-JWT can be used as follows:

@app.route('/example')
@jwt_required()
def example():
   return '%s' % current_identity

 

Note that it will only be set in the context of function decorated by jwt_required(). The problem with the current_identity proxy when using jwt_required is that the token has to be active, the identity of an expired token cannot be fetched by this function.

So why not write a function on our own to do the same. A JWT token is divided into three segments. JSON Web Tokens consist of three parts separated by dots (.), which are:

  • Header
  • Payload
  • Signature

The first step would be to get the payload, that can be done as follows:

token_second_segment = _default_request_handler().split('.')[1]

 

The payload obtained above would still be in form of JSON, it can be converted into a dict as follows:

payload = json.loads(token_second_segment.decode('base64'))

 

The identity can now be found in the payload as payload[‘identity’]. We can get the actual user from the paylaod as follows:

def jwt_identity(payload):
   """
   Jwt helper function
   :param payload:
   :return:
   """
   return User.query.get(payload['identity'])

 

Our final function will now be something like:

def get_identity():
   """
   To be used only if identity for expired tokens is required, otherwise use current_identity from flask_jwt
   :return:
   """
   token_second_segment = _default_request_handler().split('.')[1]
   missing_padding = len(token_second_segment) % 4
   payload = json.loads(token_second_segment.decode('base64'))
   user = jwt_identity(payload)
   return user

 

But after using this function for sometime, you will notice that for certain tokens, the system will raise an error saying that the JWT token is missing padding. The JWT payload is base64 encoded, and it requires the payload string to be a multiple of four. If the string is not a multiple of four, the remaining spaces can pe padded with extra =(equal to) signs. And since Python 2.7’s .decode doesn’t do that by default, we can accomplish that as follows:

missing_padding = len(token_second_segment) % 4

# ensures the string is correctly padded to be a multiple of 4
if missing_padding != 0:
   token_second_segment += b'=' * (4 - missing_padding)

 

Related links:

Continue Reading

Introducing Stream Servlet in loklak Server

A major part of my GSoC proposal was adding stream API to loklak server. In a previous blog post, I discussed the addition of Mosquitto as a message broker for MQTT streaming. After testing this service for a few days and some minor improvements, I was in a position to expose the stream to outside users using a simple API.

In this blog post, I will be discussing the addition of /api/stream.json endpoint to loklak server.

HTTP Server-Sent Events

Server-sent events (SSE) is a technology where a browser receives automatic updates from a server via HTTP connection. The Server-Sent Events EventSource API is standardized as part of HTML5 by the W3C.

Wikipedia

This API is supported by all major browsers except Microsoft Edge. For loklak, the plan was to use this event system to send messages, as they arrive, to the connected users. Apart from browser support, EventSource API can also be used with many other technologies too.

Jetty Eventsource Plugin

For Java, we can use Jetty’s EventSource plugin to send events to clients. It is similar to other Jetty servlets when it comes to processing the arguments, handling requests, etc. But it provides a simple interface to send events as they occur to connected users.

Adding Dependency

To use this plugin, we can add the following line to Gradle dependencies –

compile group: 'org.eclipse.jetty', name: 'jetty-eventsource-servlet', version: '1.0.0'

[SOURCE]

The Event Source

An EventSource is the object which is required for EventSourceServlet to send events. All the logics for emitting events needs to be defined in the related class. To link a servlet with an EventSource, we need to override the newEventSource method –

public class StreamServlet extends EventSourceServlet {
    @Override
    protected EventSource newEventSource(HttpServletRequest request) {
        String channel = request.getParameter("channel");
        if (channel == null) {
            return null;
        }
        if (channel.isEmpty()) {
            return null;
        }
        return new MqttEventSource(channel);
    }
}

[SOURCE]

If no channel is provided, the EventSource object will be null and the request will be rejected. Here, the MqttEventSource would be used to handle the stream of Tweets as they arrive from the Mosquitto message broker.

Cross Site Requests

Since the requests to this endpoint can’t be of JSONP type, it is necessary to allow cross site requests on this endpoint. This can be done by overriding the doGet method of the servlet –

@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
     response.setHeader("Access-Control-Allow-Origin", "*");
    super.doGet(request, response);
}

[SOURCE]

Adding MQTT Subscriber

When a request for events arrives, the constructor to MqttEventSource is called. At this stage, we need to connect to the stream from Mosquitto for the channel. To achieve this, we can set the class as MqttCallback using appropriate client configurations –

public class MqttEventSource implements MqttCallback {
    ...
    MqttEventSource(String channel) {
        this.channel = channel;
    }
    ...
    this.mqttClient = new MqttClient(address, "loklak_server_subscriber");
    this.mqttClient.connect();
    this.mqttClient.setCallback(this);
    this.mqttClient.subscribe(this.channel);
    ...
}

[SOURCE]

By setting the callback to this, we can override the messageArrived method to handle the arrival of a new message on the channel. Just to mention, the client library used here is Eclipse Paho.

Connecting MQTT Stream to SSE Stream

Now that we have subscribed to the channel we wish to send events from, we can use the Emitter to send events from our EventSource by implementing it –

public class MqttEventSource implements EventSource, MqttCallback {
    private Emitter emitter;


    @Override
    public void onOpen(Emitter emitter) throws IOException {
        this.emitter = emitter;
        ...
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        this.emitter.data(message.toString());
    }
}

[SOURCE]

Closing Stream on Disconnecting from User

When a client disconnects from the stream, it doesn’t makes sense to stay connected to the server. We can use the onClose method to disconnect the subscriber from the MQTT broker –

@Override
public void onClose() {
    try {
        this.mqttClient.close();
        this.mqttClient.disconnect();
    } catch (MqttException e) {
        // Log some warning 
    }
}

[SOURCE]

Conclusion

In this blog post, I discussed connecting the MQTT stream to SSE stream using Jetty’s EventSource plugin. Once in place, this event system would save us from making too many requests to collect and visualize data. The possibilities of applications of such feature are huge.

This feature can be seen in action at the World Mood Tracker app.

The changes were introduced in pull request loklak/loklak_server#1474 by @singhpratyush (me).

Resources

Continue Reading

Optimising Docker Images for loklak Server

The loklak server is in a process of moving to Kubernetes. In order to do so, we needed to have different Docker images that suit these deployments. In this blog post, I will be discussing the process through which I optimised the size of Docker image for these deployments.

Initial Image

The image that I started with used Ubuntu as base. It installed all the components needed and then modified the configurations as required –

FROM ubuntu:latest

# Env Vars
ENV LANG=en_US.UTF-8
ENV JAVA_TOOL_OPTIONS=-Dfile.encoding=UTF8
ENV DEBIAN_FRONTEND noninteractive

WORKDIR /loklak_server

RUN apt-get update
RUN apt-get upgrade -y
RUN apt-get install -y git openjdk-8-jdk
RUN git clone https://github.com/loklak/loklak_server.git /loklak_server
RUN git checkout development
RUN ./gradlew build -x test -x checkstyleTest -x checkstyleMain -x jacocoTestReport
RUN sed -i.bak 's/^\(port.http=\).*/\180/' conf/config.properties
... # More configurations
RUN echo "while true; do sleep 10;done" >> bin/start.sh

# Start
CMD ["bin/start.sh", "-Idn"]

The size of images built using this Dockerfile was quite huge –

REPOSITORY          TAG                 IMAGE ID            CREATED              SIZE

loklak_server       latest              a92f506b360d        About a minute ago   1.114 GB

ubuntu              latest              ccc7a11d65b1        3 days ago           120.1 MB

But since this size is not acceptable, we needed to reduce it.

Moving to Apline

Alpine Linux is an extremely lightweight Linux distro, built mainly for the container environment. Its size is so tiny that it hardly puts any impact on the overall size of images. So, I replaced Ubuntu with Alpine –

FROM alpine:latest

...
RUN apk update
RUN apk add git openjdk8 bash
...

And now we had much smaller images –

REPOSITORY          TAG                 IMAGE ID            CREATED             SIZE

loklak_server       latest              54b507ee9187        17 seconds ago      668.8 MB

alpine              latest              7328f6f8b418        6 weeks ago         3.966 MB

As we can see that due to no caching and small size of Alpine, the image size is reduced to almost half the original.

Reducing Content Size

There are many things in a project which are no longer needed while running the project, like the .git folder (which is huge in case of loklak) –

$ du -sh loklak_server/.git
236M loklak_server/.git

We can remove such files from the Docker image and save a lot of space –

rm -rf .[^.] .??*

Optimizing Number of Layers

The number of layers also affect the size of the image. More the number of layers, more will be the size of image. In the Dockerfile, we can club together the RUN commands for lower number of images.

RUN apk update && apk add openjdk8 git bash && \
  git clone https://github.com/loklak/loklak_server.git /loklak_server && \
  ...

After this, the effective size is again reduced by a major factor –

REPOSITORY          TAG                 IMAGE ID            CREATED             SIZE

loklak_server       latest              54b507ee9187        17 seconds ago      422.3 MB

alpine              latest              7328f6f8b418        6 weeks ago         3.966 MB

Conclusion

In this blog post, I discussed the process of optimising the size of Dockerfile for Kubernetes deployments of loklak server. The size was reduced to 426 MB from 1.234 GB and this provided much faster push/pull time for Docker images, and therefore, faster updates for Kubernetes deployments.

Resources

Continue Reading

Persistently Storing loklak Server Dumps on Kubernetes

In an earlier blog post, I discussed loklak setup on Kubernetes. The deployment mentioned in the post was to test the development branch. Next, we needed to have a deployment where all the messages are collected and dumped in text files that can be reused.

In this blog post, I will be discussing the challenges with such deployment and the approach to tackle them.

Volatile Disk in Kubernetes

The pods that hold deployments in Kubernetes have disk storage. Any data that gets written by the application stays only until the same version of deployment is running. As soon as the deployment is updated/relocated, the data stored during the application is cleaned up.


Due to this, dumps are written when loklak is running but they get wiped out when the deployment image is updated. In other words, all dumps are lost when the image updates. We needed to find a solution to this as we needed a permanent storage when collecting dumps.

Persistent Disk

In order to have a storage which can hold data permanently, we can mount persistent disk(s) on a pod at the appropriate location. This ensures that the data that is important to us stays with us, even
when the deployment goes down.


In order to add persistent disks, we first need to create a persistent disk. On Google Cloud Platform, we can use the gcloud CLI to create disks in a given region –

gcloud compute disks create --size=<required size> --zone=<same as cluster zone> <unique disk name>

After this, we can mount it on a Docker volume defined in Kubernetes configurations –

      ...
      volumeMounts:
        - mountPath: /path/to/mount
          name: volume-name
  volumes:
    - name: volume-name
      gcePersistentDisk:
        pdName: disk-name
        fsType: fileSystemType

But this setup can’t be used for storing loklak dumps. Let’s see “why” in the next section.

Rolling Updates and Persistent Disk

The Kubernetes deployment needs to be updated when the master branch of loklak server is updated. This update of master deployment would create a new pod and try to start loklak server on it. During all this, the older deployment would also be running and serving the requests.


The control will not be transferred to the newer pod until it is ready and all the probes are passing. The newer deployment will now try to mount the disk which is mentioned in the configuration, but it would fail to do so. This would happen because the older pod has already mounted the disk.


Therefore, all new deployments would simply fail to start due to insufficient resources. To overcome such issues, Kubernetes allows persistent volume claims. Let’s see how we used them for loklak deployment.

Persistent Volume Claims

Kubernetes provides Persistent Volume Claims which claim resources (storage) from a Persistent Volume (just like a pod does from a node). The higher level APIs are provided by Kubernetes (configurations and kubectl command line). In the loklak deployment, the persistent volume is a Google Compute Engine disk –

apiVersion: v1
kind: PersistentVolume
metadata:
  name: dump
  namespace: web
spec:
  capacity:
    storage: 100Gi
  accessModes:
    - ReadWriteOnce
  persistentVolumeReclaimPolicy: Retain
  storageClassName: slow
  gcePersistentDisk:
    pdName: "data-dump-disk"
    fsType: "ext4"

[SOURCE]

It must be noted here that a persistent disk by the name of data-dump-index is already created in the same region.


The storage class defines the way in which the PV should be handled, along with the provisioner for the service –

kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  name: slow
  namespace: web
provisioner: kubernetes.io/gce-pd
parameters:
  type: pd-standard
  zone: us-central1-a

[SOURCE]

After having the StorageClass and PersistentVolume, we can create a claim for the volume by using appropriate configurations –

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: dump
  namespace: web
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 100Gi
  storageClassName: slow

[SOURCE]

After this, we can mount this claim on our Deployment –

  ...
  volumeMounts:
    - name: dump
      mountPath: /loklak_server/data
volumes:
  - name: dump
    persistentVolumeClaim:
      claimName: dump

[SOURCE]

Verifying persistence of Dumps

To verify this, we can redeploy the cluster using the same persistent disk and check if the earlier dumps are still present there –

$ http http://link.to.deployment/dump/
HTTP/1.1 200 OK
Cache-Control: public, max-age=60
Content-Type: text/html;charset=utf-8
...


<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">

<h1>Index of /dump</h1>
<pre>      Name 
[gz ] <a href="messages_20170802_71562040.txt.gz">messages_20170802_71562040.txt.gz</a>   Thu Aug 03 00:07:21 GMT 2017   132M
[gz ] <a href="messages_20170803_69925009.txt.gz">messages_20170803_69925009.txt.gz</a>   Mon Aug 07 15:40:04 GMT 2017   532M
[gz ] <a href="messages_20170807_36357603.txt.gz">messages_20170807_36357603.txt.gz</a>   Wed Aug 09 10:26:24 GMT 2017   377M
[txt] <a href="messages_20170809_27974404.txt">messages_20170809_27974404.txt</a>      Thu Aug 10 08:51:49 GMT 2017  1564M
<hr></pre>
...

Conclusion

In this blog post, I discussed the process of deployment of loklak with persistent dumps on Kubernetes. This deployment is intended to work as root.loklak.org in near future. The changes were proposed in loklak/loklak_server#1377 by @singhpratyush (me).

Resources

Continue Reading

Using Mosquitto as a Message Broker for MQTT in loklak Server

In loklak server, messages are collected from various sources and indexed using Elasticsearch. To know when a message of interest arrives, users can poll the search endpoint. But this method would require a lot of HTTP requests, most of them being redundant. Also, if a user would like to collect messages for a particular topic, he would need to make a lot of requests over a period of time to get enough data.

For GSoC 2017, my proposal was to introduce stream API in the loklak server so that we could save ourselves from making too many requests and also add many use cases.

Mosquitto is Eclipse’s project which acts as a message broker for the popular MQTT protocol. MQTT, based on the pub-sub model, is a lightweight and IOT friendly protocol. In this blog post, I will discuss the basic setup of Mosquitto in the loklak server.

Installation and Dependency for Mosquitto

The installation process of Mosquitto is very simple. For Ubuntu, it is available from the pre installed PPAs –

sudo apt-get install mosquitto

Once the message broker is up and running, we can use the clients to connect to it and publish/subscribe to channels. To add MQTT client as a project dependency, we can introduce following line in Gradle dependencies file –

compile group: 'net.sf.xenqtt', name: 'xenqtt', version: '0.9.5'

[SOURCE]

After this, we can use the client libraries in the server code base.

The MQTTPublisher Class

The MQTTPublisher class in loklak would provide an interface to perform basic operations in MQTT. The implementation uses AsyncClientListener to connect to Mosquitto broker –

AsyncClientListener listener = new AsyncClientListener() {
    // Override methods according to needs
};

[SOURCE]

The publish method for the class can be used by other components of the project to publish messages on the desired channel –

public void publish(String channel, String message) {
    this.mqttClient.publish(new PublishMessage(channel, QoS.AT_LEAST_ONCE, message));
}

[SOURCE]

We also have methods which allow publishing of multiple messages to multiple channels in order to increase the functionality of the class.

Starting Publisher with Server

The flags which signal using of streaming service in loklak are located in conf/config.properties. These configurations are referred while initializing the Data Access Object and an MQTTPublisher is created if needed –

String mqttAddress = getConfig("stream.mqtt.address", "tcp://127.0.0.1:1883");
streamEnabled = getConfig("stream.enabled", false);
if (streamEnabled) {
    mqttPublisher = new MQTTPublisher(mqttAddress);
}

[SOURCE]

The mqttPublisher can now be used by other components of loklak to publish messages to the channel they want.

Adding Mosquitto to Kubernetes

Since loklak has also a nice Kubernetes setup, it was very simple to introduce a new deployment for Mosquitto to it.

Changes in Dockerfile

The Dockerfile for master deployment has to be modified to discover Mosquitto broker in the Kubernetes cluster. For this purpose, corresponding flags in config.properties have to be changed to ensure that things work fine –

sed -i.bak 's/^\(stream.enabled\).*/\1=true/' conf/config.properties && \
sed -i.bak 's/^\(stream.mqtt.address\).*/\1=mosquitto.mqtt:1883/' conf/config.properties && \

[SOURCE]

The Mosquitto broker would be available at mosquitto.mqtt:1883 because of the service that is created for it (explained in later section).

Mosquitto Deployment

The Docker image used in Kubernetes deployment of Mosquitto is taken from toke/docker-kubernetes. Two ports are exposed for the cluster but no volumes are needed –

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: mosquitto
  namespace: mqtt
spec:
  ...
  template:
    ...
    spec:
      containers:
      - name: mosquitto
        image: toke/mosquitto
        ports:
        - containerPort: 9001
        - containerPort: 8883

[SOURCE]

Exposing Mosquitto to the Cluster

Now that we have the deployment running, we need to expose the required ports to the cluster so that other components may use it. The port 9001 appears as port 80 for the service and 1883 is also exposed –

apiVersion: v1
kind: Service
metadata:
  name: mosquitto
  namespace: mqtt
  ...
spec:
  ...
  ports:
  - name: mosquitto
    port: 1883
  - name: mosquitto-web
    port: 80
    targetPort: 9001

[SOURCE]

After creating the service using this configuration, we will be able to connect our clients to Mosquitto at address mosquitto.mqtt:1883.

Conclusion

In this blog post, I discussed the process of adding Mosquitto to the loklak server project. This is the first step towards introducing the stream API for messages collected in loklak.

These changes were introduced in pull requests loklak/loklak_server#1393 and loklak/loklak_server#1398 by @singhpratyush (me).

Resources

Continue Reading

Checking Whether Migrations Are Up To Date With The Sqlalchemy Models In The Open Event Server

In the Open Event Server, in the pull requests, if there is some change in the sqlalchemy model, sometimes proper migrations for the same are missed in the PR.

The first approach to check whether the migrations were up to date in the database was with the following health check function:

from subprocess import check_output
def health_check_migrations():
   """
   Checks whether database is up to date with migrations, assumes there is a single migration head
   :return:
   """
   head = check_output(["python", "manage.py", "db", "heads"]).split(" ")[0]
   
   if head == version_num:
       return True, 'database up to date with migrations'
   return False, 'database out of date with migrations'

 

In the above function, we get the head according to the migration files as following:

head = check_output(["python", "manage.py", "db", "heads"]).split(" ")[0]


The table alembic_version contains the latest alembic revision to which the database was actually upgraded. We can get this revision from the following line:

version_num = (db.session.execute('SELECT version_num from alembic_version').fetchone())['version_num']

 

Then we compare both of the given heads and return a proper tuple based on the comparison output.While this method was pretty fast, there was a drawback in this approach. If the user forgets to generate the migration files for the the changes done in the sqlalchemy model, this approach will fail to raise a failure status in the health check.

To overcome this drawback, all the sqlalchemy models were fetched automatically and simple sqlalchemy select queries were made to check whether the migrations were up to date.

Remember that a raw SQL query will not serve our purpose in this case as you’d have to specify the columns explicitly in the query. But in the case of a sqlalchemy query, it generates a SQL query based on the fields defined in the db model, so if migrations are missing to incorporate the said change proper error will be raised.

We can accomplish this from the following function:

def health_check_migrations():
   """
   Checks whether database is up to date with migrations by performing a select query on each model
   :return:
   """
   # Get all the models in the db, all models should have a explicit __tablename__
   classes, models, table_names = [], [], []
   # noinspection PyProtectedMember
   for class_ in db.Model._decl_class_registry.values():
       try:
           table_names.append(class_.__tablename__)
           classes.append(class_)
       except:
           pass
   for table in db.metadata.tables.items():
       if table[0] in table_names:
           models.append(classes[table_names.index(table[0])])

   for model in models:
       try:
           db.session.query(model).first()
       except:
           return False, '{} model out of date with migrations'.format(model)
   return True, 'database up to date with migrations'

 

In the above code, we automatically get all the models and tables present in the database. Then for each model we try a simple SELECT query which returns the first row found. If there is any error in doing so, False, ‘{} model out of date with migrations’.format(model) is returned, so as to ensure a failure status in health checks.

Related:

Continue Reading

Implementing Health Check Endpoint in Open Event Server

A health check endpoint was required in the Open Event Server be used by Kubernetes to know when the web instance is ready to receive requests.

Following are the checks that were our primary focus for health checks:

  • Connection to the database.
  • Ensure sql-alchemy models are inline with the migrations.
  • Connection to celery workers.
  • Connection to redis instance.

Runscope/healthcheck seemed like the way to go for the same. Healthcheck wraps a Flask app object and adds a way to write simple health-check functions that can be used to monitor your application. It’s useful for asserting that your dependencies are up and running and your application can respond to HTTP requests. The Healthcheck functions are exposed via a user defined flask route so you can use an external monitoring application (monit, nagios, Runscope, etc.) to check the status and uptime of your application.

Health check endpoint was implemented at /health-check as following:

from healthcheck import HealthCheck
health = HealthCheck(current_app, "/health-check")

 

Following is the function for checking the connection to the database:

def health_check_db():
   """
   Check health status of db
   :return:
   """
   try:
       db.session.execute('SELECT 1')
       return True, 'database ok'
   except:
       sentry.captureException()
       return False, 'Error connecting to database'

 

Check functions take no arguments and should return a tuple of (bool, str). The boolean is whether or not the check passed. The message is any string or output that should be rendered for this check. Useful for error messages/debugging.

The above function executes a query on the database to check whether it is connected properly. If the query runs successfully, it returns a tuple True, ‘database ok’. sentry.captureException() makes sure that the sentry instance receives a proper exception event with all the information about the exception. If there is an error connecting to the database, the exception will be thrown. The tuple returned in this case will be return False, ‘Error connecting to database’.

Finally to add this to the endpoint:

health.add_check(health_check_db)

Following is the response for a successful health check:

{
   "status": "success",
   "timestamp": 1500915121.52474,
   "hostname": "shubham",
   "results": [
       {
           "output": "database ok",
           "checker": "health_check_db",
           "expires": 1500915148.524729,
           "passed": true,
           "timestamp": 1500915121.524729
       }
   ]
}

If the database is not connected the following error will be shown:

{
           "output": "Error connecting to database",
           "checker": "health_check_db",
           "expires": 1500965798.307425,
           "passed": false,
           "timestamp": 1500965789.307425
}

Related:

Continue Reading

Deploying loklak Server on Kubernetes with External Elasticsearch

Kubernetes is an open-source system for automating deployment, scaling, and management of containerized applications.

kubernetes.io

Kubernetes is an awesome cloud platform, which ensures that cloud applications run reliably. It runs automated tests, flawless updates, smart roll out and rollbacks, simple scaling and a lot more.

So as a part of GSoC, I worked on taking the loklak server to Kubernetes on Google Cloud Platform. In this blog post, I will be discussing the approach followed to deploy development branch of loklak on Kubernetes.

New Docker Image

Since Kubernetes deployments work on Docker images, we needed one for the loklak project. The existing image would not be up to the mark for Kubernetes as it contained the declaration of volumes and exposing of ports. So I wrote a new Docker image which could be used in Kubernetes.

The image would simply clone loklak server, build the project and trigger the server as CMD

FROM alpine:latest

ENV LANG=en_US.UTF-8
ENV JAVA_TOOL_OPTIONS=-Dfile.encoding=UTF8

WORKDIR /loklak_server

RUN apk update && apk add openjdk8 git bash && \
    git clone https://github.com/loklak/loklak_server.git /loklak_server && \
    git checkout development && \
    ./gradlew build -x test -x checkstyleTest -x checkstyleMain -x jacocoTestReport && \
    # Some Configurations and Cleanups

CMD ["bin/start.sh", "-Idn"]

[SOURCE]

This image wouldn’t have any volumes or exposed ports and we are now free to configure them in the configuration files (discussed in a later section).

Building and Pushing Docker Image using Travis

To automatically build and push on a commit to the master branch, Travis build is used. In the after_success section, a call to push Docker image is made.

Travis environment variables hold the username and password for Docker hub and are used for logging in –

docker login -u $DOCKER_USERNAME -p $DOCKER_PASSWORD

[SOURCE]

We needed checks there to ensure that we are on the right branch for the push and we are not handling a pull request –

# Build and push Kubernetes Docker image
KUBERNETES_BRANCH=loklak/loklak_server:latest-kubernetes-$TRAVIS_BRANCH
KUBERNETES_COMMIT=loklak/loklak_server:kubernetes-$TRAVIS_COMMIT
  
if [ "$TRAVIS_BRANCH" == "development" ]; then
    docker build -t loklak_server_kubernetes kubernetes/images/development
    docker tag loklak_server_kubernetes $KUBERNETES_BRANCH
    docker push $KUBERNETES_BRANCH
    docker tag $KUBERNETES_BRANCH $KUBERNETES_COMMIT
    docker push $KUBERNETES_COMMIT
elif [ "$TRAVIS_BRANCH" == "master" ]; then
    # Build and push master
else
    echo "Skipping Kubernetes image push for branch $TRAVIS_BRANCH"
fi

[SOURCE]

Kubernetes Configurations for loklak

Kubernetes cluster can completely be configured using configurations written in YAML format. The deployment of loklak uses the previously built image. Initially, the image tagged as latest-kubernetes-development is used –

apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: server
  namespace: web
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: server
    spec:
      containers:
      - name: server
        image: loklak/loklak_server:latest-kubernetes-development
        ...

[SOURCE]

Readiness and Liveness Probes

Probes act as the top level tester for the health of a deployment in Kubernetes. The probes are performed periodically to ensure that things are working fine and appropriate steps are taken if they fail.

When a new image is updated, the older pod still runs and servers the requests. It is replaced by the new ones only when the probes are successful, otherwise, the update is rolled back.

In loklak, the /api/status.json endpoint gives information about status of deployment and hence is a good target for probes –

livenessProbe:
  httpGet:
    path: /api/status.json
    port: 80
  initialDelaySeconds: 30
  timeoutSeconds: 3
readinessProbe:
  httpGet:
    path: /api/status.json
    port: 80
  initialDelaySeconds: 30
  timeoutSeconds: 3

[SOURCE]

These probes are performed periodically and the server is restarted if they fail (non-success HTTP status code or takes more than 3 seconds).

Ports and Volumes

In the configurations, port 80 is exposed as this is where Jetty serves inside loklak –

ports:
- containerPort: 80
  protocol: TCP

[SOURCE]

If we notice, this is the port that we used for running the probes. Since the development branch deployment holds no dumps, we didn’t need to specify any explicit volumes for persistence.

Load Balancer Service

While creating the configurations, a new public IP is assigned to the deployment using Google Cloud Platform’s load balancer. It starts listening on port 80 –

ports:
- containerPort: 80
  protocol: TCP

[SOURCE]

Since this service creates a new public IP, it is recommended not to replace/recreate this services as this would result in the creation of new public IP. Other components can be updated individually.

Kubernetes Configurations for Elasticsearch

To maintain a persistent index, this deployment would require an external Elasticsearch cluster. loklak is able to connect itself to external Elasticsearch cluster by changing a few configurations.

Docker Image and Environment Variables

The image used for Elasticsearch is taken from pires/docker-elasticsearch-kubernetes. It allows easy configuration of properties from environment variables in configurations. Here is a list of configurable variables, but we needed just a few of them to do our task –

image: quay.io/pires/docker-elasticsearch-kubernetes:2.0.0
env:
- name: KUBERNETES_CA_CERTIFICATE_FILE
  value: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
- name: NAMESPACE
  valueFrom:
    fieldRef:
      fieldPath: metadata.namespace
- name: "CLUSTER_NAME"
  value: "loklakcluster"
- name: "DISCOVERY_SERVICE"
  value: "elasticsearch"
- name: NODE_MASTER
  value: "true"
- name: NODE_DATA
  value: "true"
- name: HTTP_ENABLE
  value: "true"

[SOURCE]

Persistent Index using Persistent Cloud Disk

To make the index last even after the deployment is stopped, we needed a stable place where we could store all that data. Here, Google Compute Engine’s standard persistent disk was used. The disk can be created using GCP web portal or the gcloud CLI.

Before attaching the disk, we need to declare a volume where we could mount it –

volumeMounts:
- mountPath: /data
  name: storage

[SOURCE]

Now that we have a volume, we can simply mount the persistent disk on it –

volumes:
- name: storage
  gcePersistentDisk:
    pdName: data-index-disk
    fsType: ext4

[SOURCE]

Now, whenever we deploy these configurations, we can reuse the previous index.

Exposing Kubernetes to Cluster

The HTTP and transport clients are enabled on port 9200 and 9300 respectively. They can be exposed to the rest of the cluster using the following service –

apiVersion: v1
kind: Service
...
Spec:
  ...
  ports:
  - name: http
    port: 9200
    protocol: TCP
  - name: transport
    port: 9300
    protocol: TCP

[SOURCE]

Once deployed, other deployments can access the cluster API from ports 9200 and 9300.

Connecting loklak to Kubernetes

To connect loklak to external Elasticsearch cluster, TransportClient Java API is used. In order to enable these settings, we simply need to make some changes in configurations.

Since we enable the service named “elasticsearch” in namespace “elasticsearch”, we can access the cluster at address elasticsearch.elasticsearch:9200 (web) and elasticsearch.elasticsearch:9300 (transport).

To confine these changes only to Kubernetes deployment, we can use sed command while building the image (in Dockerfile) –

sed -i.bak 's/^\(elasticsearch_transport.enabled\).*/\1=true/' conf/config.properties && \
sed -i.bak 's/^\(elasticsearch_transport.addresses\).*/\1=elasticsearch.elasticsearch:9300/' conf/config.properties && \

[SOURCE]

Now when we create the deployments in Kubernetes cluster, loklak auto connects to the external elasticsearch index and creates indices if needed.

Verifying persistence of the Elasticsearch Index

In order to see that the data persists, we can completely delete the deployment or even the cluster if we want. Later, when we recreate the deployment, we can see all the messages already present in the index.

I  [2017-07-29 09:42:51,804][INFO ][node                     ] [Hellion] initializing ...
 
I  [2017-07-29 09:42:52,024][INFO ][plugins                  ] [Hellion] loaded [cloud-kubernetes], sites []
 
I  [2017-07-29 09:42:52,055][INFO ][env                      ] [Hellion] using [1] data paths, mounts [[/data (/dev/sdb)]], net usable_space [84.9gb], net total_space [97.9gb], spins? [possibly], types [ext4]
 
I  [2017-07-29 09:42:53,543][INFO ][node                     ] [Hellion] initialized
 
I  [2017-07-29 09:42:53,543][INFO ][node                     ] [Hellion] starting ...
 
I  [2017-07-29 09:42:53,620][INFO ][transport                ] [Hellion] publish_address {10.8.1.13:9300}, bound_addresses {10.8.1.13:9300}
 
I  [2017-07-29 09:42:53,633][INFO ][discovery                ] [Hellion] loklakcluster/cJtXERHETKutq7nujluJvA
 
I  [2017-07-29 09:42:57,866][INFO ][cluster.service          ] [Hellion] new_master {Hellion}{cJtXERHETKutq7nujluJvA}{10.8.1.13}{10.8.1.13:9300}{master=true}, reason: zen-disco-join(elected_as_master, [0] joins received)
 
I  [2017-07-29 09:42:57,955][INFO ][http                     ] [Hellion] publish_address {10.8.1.13:9200}, bound_addresses {10.8.1.13:9200}
 
I  [2017-07-29 09:42:57,955][INFO ][node                     ] [Hellion] started
 
I  [2017-07-29 09:42:58,082][INFO ][gateway                  ] [Hellion] recovered [8] indices into cluster_state

In the last line from the logs, we can see that indices already present on the disk were recovered. Now if we head to the public IP assigned to the cluster, we can see that the message count is restored.

Conclusion

In this blog post, I discussed how we utilised the Kubernetes setup to shift loklak to Google Cloud Platform. The deployment is active and can be accessed from the link provided under wiki section of loklak/loklak_server repo.

I introduced these changes in pull request loklak/loklak_server#1349 with the help of @niranjan94, @uday96 and @chiragw15.

Resources

Continue Reading

Caching Elasticsearch Aggregation Results in loklak Server

To provide aggregated data for various classifiers, loklak uses Elasticsearch aggregations. Aggregated data speaks a lot more than a few instances from it can say. But performing aggregations on each request can be very resource consuming. So we needed to come up with a way to reduce this load.

In this post, I will be discussing how I came up with a caching model for the aggregated data from the Elasticsearch index.

Fields to Consider while Caching

At the classifier endpoint, aggregations can be requested based on the following fields –

  • Classifier Name
  • Classifier Classes
  • Countries
  • Start Date
  • End Date

But to cache results, we can ignore cases where we just require a few classes or countries and store aggregations for all of them instead. So the fields that will define the cache to look for will be –

  • Classifier Name
  • Start Date
  • End Date

Type of Cache

The data structure used for caching was Java’s HashMap. It would be used to map a special string key to a special object discussed in a later section.

Key

The key is built using the fields mentioned previously –

private static String getKey(String index, String classifier, String sinceDate, String untilDate) {
    return index + "::::"
        + classifier + "::::"
        + (sinceDate == null ? "" : sinceDate) + "::::"
        + (untilDate == null ? "" : untilDate);
}

[SOURCE]

In this way, we can handle requests where a user makes a request for every class there is without running the expensive aggregation job every time. This is because the key for such requests will be same as we are not considering country and class for this purpose.

Value

The object used as key in the HashMap is a wrapper containing the following –

  1. json – It is a JSONObject containing the actual data.
  2. expiry – It is the expiry of the object in milliseconds.

class JSONObjectWrapper {
    private JSONObject json; 
    private long expiry;
    ... 
}

Timeout

The timeout associated with a cache is defined in the configuration file of the project as “classifierservlet.cache.timeout”. It defaults to 5 minutes and is used to set the eexpiryof a cached JSONObject –

class JSONObjectWrapper {
    ...
    private static long timeout = DAO.getConfig("classifierservlet.cache.timeout", 300000);

    JSONObjectWrapper(JSONObject json) {
        this.json = json;
        this.expiry = System.currentTimeMillis() + timeout;
    }
    ...
}

 

Cache Hit

For searching in the cache, the previously mentioned string is composed from the parameters requested by the user. Checking for a cache hit can be done in the following manner –

String key = getKey(index, classifier, sinceDate, untilDate);
if (cacheMap.keySet().contains(key)) {
    JSONObjectWrapper jw = cacheMap.get(key);
    if (!jw.isExpired()) {
        // Do something with jw
    }
}
// Calculate the aggregations
...

But since jw here would contain all the data, we would need to filter out the classes and countries which are not needed.

Filtering results

For filtering out the parts which do not contain the information requested by the user, we can perform a simple pass and exclude the results that are not needed.

Since the number of fields to filter out, i.e. classes and countries, would not be that high, this process would not be that resource intensive. And at the same time, would save us from requesting heavy aggregation tasks from the user.

Since the data about classes is nested inside the respective country field, we need to perform two level of filtering –

JSONObject retJson = new JSONObject(true);
for (String key : json.keySet()) {
    JSONArray value = filterInnerClasses(json.getJSONArray(key), classes);
    if ("GLOBAL".equals(key) || countries.contains(key)) {
        retJson.put(key, value);
    }
}

Cache Miss

In the case of a cache miss, the helper functions are called from ElasticsearchClient.java to get results. These results are then parsed from HashMap to JSONObject and stored in the cache for future usages.

JSONObject freshCache = getFromElasticsearch(index, classifier, sinceDate, untilDate);
cacheMap.put(key, new JSONObjectWrapper(freshCache));

The getFromElasticsearch method finds all the possible classes and makes a request to the appropriate method in ElasticsearchClient, getting data for all classifiers and all countries.

Conclusion

In this blog post, I discussed the need for caching of aggregations and the way it is achieved in the loklak server. This feature was introduced in pull request loklak/loklak_server#1333 by @singhpratyush (me).

Resources

Continue Reading
Close Menu