Introducing Stream Servlet in loklak Server

A major part of my GSoC proposal was adding stream API to loklak server. In a previous blog post, I discussed the addition of Mosquitto as a message broker for MQTT streaming. After testing this service for a few days and some minor improvements, I was in a position to expose the stream to outside users using a simple API. In this blog post, I will be discussing the addition of /api/stream.json endpoint to loklak server. HTTP Server-Sent Events Server-sent events (SSE) is a technology where a browser receives automatic updates from a server via HTTP connection. The Server-Sent Events EventSource API is standardized as part of HTML5 by the W3C. - Wikipedia This API is supported by all major browsers except Microsoft Edge. For loklak, the plan was to use this event system to send messages, as they arrive, to the connected users. Apart from browser support, EventSource API can also be used with many other technologies too. Jetty Eventsource Plugin For Java, we can use Jetty’s EventSource plugin to send events to clients. It is similar to other Jetty servlets when it comes to processing the arguments, handling requests, etc. But it provides a simple interface to send events as they occur to connected users. Adding Dependency To use this plugin, we can add the following line to Gradle dependencies - compile group: 'org.eclipse.jetty', name: 'jetty-eventsource-servlet', version: '1.0.0' [SOURCE] The Event Source An EventSource is the object which is required for EventSourceServlet to send events. All the logics for emitting events needs to be defined in the related class. To link a servlet with an EventSource, we need to override the newEventSource method - public class StreamServlet extends EventSourceServlet { @Override protected EventSource newEventSource(HttpServletRequest request) { String channel = request.getParameter("channel"); if (channel == null) { return null; } if (channel.isEmpty()) { return null; } return new MqttEventSource(channel); } } [SOURCE] If no channel is provided, the EventSource object will be null and the request will be rejected. Here, the MqttEventSource would be used to handle the stream of Tweets as they arrive from the Mosquitto message broker. Cross Site Requests Since the requests to this endpoint can’t be of JSONP type, it is necessary to allow cross site requests on this endpoint. This can be done by overriding the doGet method of the servlet - @Override protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { response.setHeader("Access-Control-Allow-Origin", "*"); super.doGet(request, response); } [SOURCE] Adding MQTT Subscriber When a request for events arrives, the constructor to MqttEventSource is called. At this stage, we need to connect to the stream from Mosquitto for the channel. To achieve this, we can set the class as MqttCallback using appropriate client configurations - public class MqttEventSource implements MqttCallback { ... MqttEventSource(String channel) { this.channel = channel; } ... this.mqttClient = new MqttClient(address, "loklak_server_subscriber"); this.mqttClient.connect(); this.mqttClient.setCallback(this); this.mqttClient.subscribe(this.channel); ... } [SOURCE] By setting the callback to this, we can override the messageArrived method to handle the arrival of a new message on the channel. Just to…

Continue ReadingIntroducing Stream Servlet in loklak Server

Using Mosquitto as a Message Broker for MQTT in loklak Server

In loklak server, messages are collected from various sources and indexed using Elasticsearch. To know when a message of interest arrives, users can poll the search endpoint. But this method would require a lot of HTTP requests, most of them being redundant. Also, if a user would like to collect messages for a particular topic, he would need to make a lot of requests over a period of time to get enough data. For GSoC 2017, my proposal was to introduce stream API in the loklak server so that we could save ourselves from making too many requests and also add many use cases. Mosquitto is Eclipse’s project which acts as a message broker for the popular MQTT protocol. MQTT, based on the pub-sub model, is a lightweight and IOT friendly protocol. In this blog post, I will discuss the basic setup of Mosquitto in the loklak server. Installation and Dependency for Mosquitto The installation process of Mosquitto is very simple. For Ubuntu, it is available from the pre installed PPAs - sudo apt-get install mosquitto Once the message broker is up and running, we can use the clients to connect to it and publish/subscribe to channels. To add MQTT client as a project dependency, we can introduce following line in Gradle dependencies file - compile group: 'net.sf.xenqtt', name: 'xenqtt', version: '0.9.5' [SOURCE] After this, we can use the client libraries in the server code base. The MQTTPublisher Class The MQTTPublisher class in loklak would provide an interface to perform basic operations in MQTT. The implementation uses AsyncClientListener to connect to Mosquitto broker - AsyncClientListener listener = new AsyncClientListener() { // Override methods according to needs }; [SOURCE] The publish method for the class can be used by other components of the project to publish messages on the desired channel - public void publish(String channel, String message) { this.mqttClient.publish(new PublishMessage(channel, QoS.AT_LEAST_ONCE, message)); } [SOURCE] We also have methods which allow publishing of multiple messages to multiple channels in order to increase the functionality of the class. Starting Publisher with Server The flags which signal using of streaming service in loklak are located in conf/config.properties. These configurations are referred while initializing the Data Access Object and an MQTTPublisher is created if needed - String mqttAddress = getConfig("stream.mqtt.address", "tcp://127.0.0.1:1883"); streamEnabled = getConfig("stream.enabled", false); if (streamEnabled) { mqttPublisher = new MQTTPublisher(mqttAddress); } [SOURCE] The mqttPublisher can now be used by other components of loklak to publish messages to the channel they want. Adding Mosquitto to Kubernetes Since loklak has also a nice Kubernetes setup, it was very simple to introduce a new deployment for Mosquitto to it. Changes in Dockerfile The Dockerfile for master deployment has to be modified to discover Mosquitto broker in the Kubernetes cluster. For this purpose, corresponding flags in config.properties have to be changed to ensure that things work fine - sed -i.bak 's/^\(stream.enabled\).*/\1=true/' conf/config.properties && \ sed -i.bak 's/^\(stream.mqtt.address\).*/\1=mosquitto.mqtt:1883/' conf/config.properties && \ [SOURCE] The Mosquitto broker would be available at mosquitto.mqtt:1883 because of the service that is created…

Continue ReadingUsing Mosquitto as a Message Broker for MQTT in loklak Server