Using Mosquitto as a Message Broker for MQTT in loklak Server

In loklak server, messages are collected from various sources and indexed using Elasticsearch. To know when a message of interest arrives, users can poll the search endpoint. But this method would require a lot of HTTP requests, most of them being redundant. Also, if a user would like to collect messages for a particular topic, he would need to make a lot of requests over a period of time to get enough data. For GSoC 2017, my proposal was to introduce stream API in the loklak server so that we could save ourselves from making too many requests and also add many use cases. Mosquitto is Eclipse’s project which acts as a message broker for the popular MQTT protocol. MQTT, based on the pub-sub model, is a lightweight and IOT friendly protocol. In this blog post, I will discuss the basic setup of Mosquitto in the loklak server. Installation and Dependency for Mosquitto The installation process of Mosquitto is very simple. For Ubuntu, it is available from the pre installed PPAs - sudo apt-get install mosquitto Once the message broker is up and running, we can use the clients to connect to it and publish/subscribe to channels. To add MQTT client as a project dependency, we can introduce following line in Gradle dependencies file - compile group: 'net.sf.xenqtt', name: 'xenqtt', version: '0.9.5' [SOURCE] After this, we can use the client libraries in the server code base. The MQTTPublisher Class The MQTTPublisher class in loklak would provide an interface to perform basic operations in MQTT. The implementation uses AsyncClientListener to connect to Mosquitto broker - AsyncClientListener listener = new AsyncClientListener() { // Override methods according to needs }; [SOURCE] The publish method for the class can be used by other components of the project to publish messages on the desired channel - public void publish(String channel, String message) { this.mqttClient.publish(new PublishMessage(channel, QoS.AT_LEAST_ONCE, message)); } [SOURCE] We also have methods which allow publishing of multiple messages to multiple channels in order to increase the functionality of the class. Starting Publisher with Server The flags which signal using of streaming service in loklak are located in conf/config.properties. These configurations are referred while initializing the Data Access Object and an MQTTPublisher is created if needed - String mqttAddress = getConfig("stream.mqtt.address", "tcp://127.0.0.1:1883"); streamEnabled = getConfig("stream.enabled", false); if (streamEnabled) { mqttPublisher = new MQTTPublisher(mqttAddress); } [SOURCE] The mqttPublisher can now be used by other components of loklak to publish messages to the channel they want. Adding Mosquitto to Kubernetes Since loklak has also a nice Kubernetes setup, it was very simple to introduce a new deployment for Mosquitto to it. Changes in Dockerfile The Dockerfile for master deployment has to be modified to discover Mosquitto broker in the Kubernetes cluster. For this purpose, corresponding flags in config.properties have to be changed to ensure that things work fine - sed -i.bak 's/^\(stream.enabled\).*/\1=true/' conf/config.properties && \ sed -i.bak 's/^\(stream.mqtt.address\).*/\1=mosquitto.mqtt:1883/' conf/config.properties && \ [SOURCE] The Mosquitto broker would be available at mosquitto.mqtt:1883 because of the service that is created…

Continue ReadingUsing Mosquitto as a Message Broker for MQTT in loklak Server

Sending a streaming zip file in node.js

The webapp generator that we set up is now up and running on a heroku instance and is working to generate zips of generated websites. One of the requirements was that when the front-end presses, the "Generate" button, a request is created to the backend node app to start creating the html pages, and pack them into a zip file. Then the said zip file will be made available to download. I intended for the zip file to become available to download immediately and the server to continuously pack data into the zip, as the user downloads it - a process known as streaming zip serving. (If you might have noticed, Google Drive uses this too. You do not know the final zip size before the download has completed.) If the server takes time in creating the contents of the zip, this method can help, as it will allow the user to start downloading the file, before all the contents are finalised.   To achieve the mentioned goal, the idea Node.js module to use archiver. If you look at the app.js code you'll find on the /generate POST endpoint, we take the 'req' params and pass to a pipeToRes() function, which is defined in generator.js app.post('/generate', uploadedFiles, function(req, res, next) { console.log(req.files); console.log(req.body); res.setHeader('Content-Type', 'application/zip'); generator.pipeZipToRes(req, res); }); And in the generator, you can find this block of code that pipes the streaming archive to the response.   const zipfile = archiver('zip'); zipfile.on('error', (err) => { throw err; }); zipfile.pipe(res); zipfile.directory(distHelper.distPath, '/').finalize(); We zip the directory defined in distPath and the archiver API has a fluent call to .finalize() that finished the download for the user, when the zip is completed.

Continue ReadingSending a streaming zip file in node.js