Introducing Stream Servlet in loklak Server
A major part of my GSoC proposal was adding stream API to loklak server. In a previous blog post, I discussed the addition of Mosquitto as a message broker for MQTT streaming. After testing this service for a few days and some minor improvements, I was in a position to expose the stream to outside users using a simple API. In this blog post, I will be discussing the addition of /api/stream.json endpoint to loklak server. HTTP Server-Sent Events Server-sent events (SSE) is a technology where a browser receives automatic updates from a server via HTTP connection. The Server-Sent Events EventSource API is standardized as part of HTML5 by the W3C. - Wikipedia This API is supported by all major browsers except Microsoft Edge. For loklak, the plan was to use this event system to send messages, as they arrive, to the connected users. Apart from browser support, EventSource API can also be used with many other technologies too. Jetty Eventsource Plugin For Java, we can use Jetty’s EventSource plugin to send events to clients. It is similar to other Jetty servlets when it comes to processing the arguments, handling requests, etc. But it provides a simple interface to send events as they occur to connected users. Adding Dependency To use this plugin, we can add the following line to Gradle dependencies - compile group: 'org.eclipse.jetty', name: 'jetty-eventsource-servlet', version: '1.0.0' [SOURCE] The Event Source An EventSource is the object which is required for EventSourceServlet to send events. All the logics for emitting events needs to be defined in the related class. To link a servlet with an EventSource, we need to override the newEventSource method - public class StreamServlet extends EventSourceServlet { @Override protected EventSource newEventSource(HttpServletRequest request) { String channel = request.getParameter("channel"); if (channel == null) { return null; } if (channel.isEmpty()) { return null; } return new MqttEventSource(channel); } } [SOURCE] If no channel is provided, the EventSource object will be null and the request will be rejected. Here, the MqttEventSource would be used to handle the stream of Tweets as they arrive from the Mosquitto message broker. Cross Site Requests Since the requests to this endpoint can’t be of JSONP type, it is necessary to allow cross site requests on this endpoint. This can be done by overriding the doGet method of the servlet - @Override protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { response.setHeader("Access-Control-Allow-Origin", "*"); super.doGet(request, response); } [SOURCE] Adding MQTT Subscriber When a request for events arrives, the constructor to MqttEventSource is called. At this stage, we need to connect to the stream from Mosquitto for the channel. To achieve this, we can set the class as MqttCallback using appropriate client configurations - public class MqttEventSource implements MqttCallback { ... MqttEventSource(String channel) { this.channel = channel; } ... this.mqttClient = new MqttClient(address, "loklak_server_subscriber"); this.mqttClient.connect(); this.mqttClient.setCallback(this); this.mqttClient.subscribe(this.channel); ... } [SOURCE] By setting the callback to this, we can override the messageArrived method to handle the arrival of a new message on the channel. Just to…
