Using Mosquitto as a Message Broker for MQTT in loklak Server

In loklak server, messages are collected from various sources and indexed using Elasticsearch. To know when a message of interest arrives, users can poll the search endpoint. But this method would require a lot of HTTP requests, most of them being redundant. Also, if a user would like to collect messages for a particular topic, he would need to make a lot of requests over a period of time to get enough data. For GSoC 2017, my proposal was to introduce stream API in the loklak server so that we could save ourselves from making too many requests and also add many use cases. Mosquitto is Eclipse’s project which acts as a message broker for the popular MQTT protocol. MQTT, based on the pub-sub model, is a lightweight and IOT friendly protocol. In this blog post, I will discuss the basic setup of Mosquitto in the loklak server. Installation and Dependency for Mosquitto The installation process of Mosquitto is very simple. For Ubuntu, it is available from the pre installed PPAs - sudo apt-get install mosquitto Once the message broker is up and running, we can use the clients to connect to it and publish/subscribe to channels. To add MQTT client as a project dependency, we can introduce following line in Gradle dependencies file - compile group: 'net.sf.xenqtt', name: 'xenqtt', version: '0.9.5' [SOURCE] After this, we can use the client libraries in the server code base. The MQTTPublisher Class The MQTTPublisher class in loklak would provide an interface to perform basic operations in MQTT. The implementation uses AsyncClientListener to connect to Mosquitto broker - AsyncClientListener listener = new AsyncClientListener() { // Override methods according to needs }; [SOURCE] The publish method for the class can be used by other components of the project to publish messages on the desired channel - public void publish(String channel, String message) { this.mqttClient.publish(new PublishMessage(channel, QoS.AT_LEAST_ONCE, message)); } [SOURCE] We also have methods which allow publishing of multiple messages to multiple channels in order to increase the functionality of the class. Starting Publisher with Server The flags which signal using of streaming service in loklak are located in conf/config.properties. These configurations are referred while initializing the Data Access Object and an MQTTPublisher is created if needed - String mqttAddress = getConfig("stream.mqtt.address", "tcp://127.0.0.1:1883"); streamEnabled = getConfig("stream.enabled", false); if (streamEnabled) { mqttPublisher = new MQTTPublisher(mqttAddress); } [SOURCE] The mqttPublisher can now be used by other components of loklak to publish messages to the channel they want. Adding Mosquitto to Kubernetes Since loklak has also a nice Kubernetes setup, it was very simple to introduce a new deployment for Mosquitto to it. Changes in Dockerfile The Dockerfile for master deployment has to be modified to discover Mosquitto broker in the Kubernetes cluster. For this purpose, corresponding flags in config.properties have to be changed to ensure that things work fine - sed -i.bak 's/^\(stream.enabled\).*/\1=true/' conf/config.properties && \ sed -i.bak 's/^\(stream.mqtt.address\).*/\1=mosquitto.mqtt:1883/' conf/config.properties && \ [SOURCE] The Mosquitto broker would be available at mosquitto.mqtt:1883 because of the service that is created…

Continue ReadingUsing Mosquitto as a Message Broker for MQTT in loklak Server