Using Mosquitto as a Message Broker for MQTT in loklak Server

In loklak server, messages are collected from various sources and indexed using Elasticsearch. To know when a message of interest arrives, users can poll the search endpoint. But this method would require a lot of HTTP requests, most of them being redundant. Also, if a user would like to collect messages for a particular topic, he would need to make a lot of requests over a period of time to get enough data.

For GSoC 2017, my proposal was to introduce stream API in the loklak server so that we could save ourselves from making too many requests and also add many use cases.

Mosquitto is Eclipse’s project which acts as a message broker for the popular MQTT protocol. MQTT, based on the pub-sub model, is a lightweight and IOT friendly protocol. In this blog post, I will discuss the basic setup of Mosquitto in the loklak server.

Installation and Dependency for Mosquitto

The installation process of Mosquitto is very simple. For Ubuntu, it is available from the pre installed PPAs –

sudo apt-get install mosquitto

Once the message broker is up and running, we can use the clients to connect to it and publish/subscribe to channels. To add MQTT client as a project dependency, we can introduce following line in Gradle dependencies file –

compile group: 'net.sf.xenqtt', name: 'xenqtt', version: '0.9.5'


After this, we can use the client libraries in the server code base.

The MQTTPublisher Class

The MQTTPublisher class in loklak would provide an interface to perform basic operations in MQTT. The implementation uses AsyncClientListener to connect to Mosquitto broker –

AsyncClientListener listener = new AsyncClientListener() {
    // Override methods according to needs


The publish method for the class can be used by other components of the project to publish messages on the desired channel –

public void publish(String channel, String message) {
    this.mqttClient.publish(new PublishMessage(channel, QoS.AT_LEAST_ONCE, message));


We also have methods which allow publishing of multiple messages to multiple channels in order to increase the functionality of the class.

Starting Publisher with Server

The flags which signal using of streaming service in loklak are located in conf/ These configurations are referred while initializing the Data Access Object and an MQTTPublisher is created if needed –

String mqttAddress = getConfig("stream.mqtt.address", "tcp://");
streamEnabled = getConfig("stream.enabled", false);
if (streamEnabled) {
    mqttPublisher = new MQTTPublisher(mqttAddress);


The mqttPublisher can now be used by other components of loklak to publish messages to the channel they want.

Adding Mosquitto to Kubernetes

Since loklak has also a nice Kubernetes setup, it was very simple to introduce a new deployment for Mosquitto to it.

Changes in Dockerfile

The Dockerfile for master deployment has to be modified to discover Mosquitto broker in the Kubernetes cluster. For this purpose, corresponding flags in have to be changed to ensure that things work fine –

sed -i.bak 's/^\(stream.enabled\).*/\1=true/' conf/ && \
sed -i.bak 's/^\(stream.mqtt.address\).*/\1=mosquitto.mqtt:1883/' conf/ && \


The Mosquitto broker would be available at mosquitto.mqtt:1883 because of the service that is created for it (explained in later section).

Mosquitto Deployment

The Docker image used in Kubernetes deployment of Mosquitto is taken from toke/docker-kubernetes. Two ports are exposed for the cluster but no volumes are needed –

apiVersion: extensions/v1beta1
kind: Deployment
  name: mosquitto
  namespace: mqtt
      - name: mosquitto
        image: toke/mosquitto
        - containerPort: 9001
        - containerPort: 8883


Exposing Mosquitto to the Cluster

Now that we have the deployment running, we need to expose the required ports to the cluster so that other components may use it. The port 9001 appears as port 80 for the service and 1883 is also exposed –

apiVersion: v1
kind: Service
  name: mosquitto
  namespace: mqtt
  - name: mosquitto
    port: 1883
  - name: mosquitto-web
    port: 80
    targetPort: 9001


After creating the service using this configuration, we will be able to connect our clients to Mosquitto at address mosquitto.mqtt:1883.


In this blog post, I discussed the process of adding Mosquitto to the loklak server project. This is the first step towards introducing the stream API for messages collected in loklak.

These changes were introduced in pull requests loklak/loklak_server#1393 and loklak/loklak_server#1398 by @singhpratyush (me).


Sending a streaming zip file in node.js

The webapp generator that we set up is now up and running on a heroku instance and is working to generate zips of generated websites.

One of the requirements was that when the front-end presses, the “Generate” button, a request is created to the backend node app to start creating the html pages, and pack them into a zip file. Then the said zip file will be made available to download.

I intended for the zip file to become available to download immediately and the server to continuously pack data into the zip, as the user downloads it – a process known as streaming zip serving. (If you might have noticed, Google Drive uses this too. You do not know the final zip size before the download has completed.)

If the server takes time in creating the contents of the zip, this method can help, as it will allow the user to start downloading the file, before all the contents are finalised.


To achieve the mentioned goal, the idea Node.js module to use archiver.

If you look at the app.js code you’ll find on the /generate POST endpoint, we take the ‘req’ params and pass to a pipeToRes() function, which is defined in generator.js'/generate', uploadedFiles, function(req, res, next) {
res.setHeader('Content-Type', 'application/zip');
generator.pipeZipToRes(req, res);

And in the generator, you can find this block of code that pipes the streaming archive to the response.


      const zipfile = archiver('zip');

      zipfile.on('error', (err) => {
        throw err;

      zipfile.pipe(res);, '/').finalize();

We zip the directory defined in distPath and the archiver API has a fluent call to .finalize() that finished the download for the user, when the zip is completed.