Adding Push endpoint to send data from Loklak Search to Loklak Server

To provide enriched and sufficient amount of data to Loklak, Loklak Server should have multiple sources of data. The api/push.json endpoint of loklak server is used in Loklak to post the search result object to server. It will increase the amount and quality of data on server once the Twitter api is supported by Loklak (Work is in progress to add support for twitter api in loklak). Creating Push Service The idea is to create a separate service for making a Post request to server. First step would be to create a new ‘PushService’ under ‘services/’ using: ng g service services/push Creating model for Push Api Response Before starting to write code for push service, create a new model for the type of response data obtained from Post request to ‘api/push.json’. For this, create a new file push.ts under ‘models/’ with the code given below and export the respective push interface method in index file. export interface PushApiResponse { status: string; records: number; mps: number; message: string; } Writing Post request in Push Service Next step would be to create a Post request to api/push.json using HttpClient module. Import necessary dependencies and create an object of HttpClient module in constructor and write a PostData() method which would take the data to be send, makes a Post request and returns the Observable of PushApiResponse created above. import { Injectable } from ‘@angular/core’; import { HttpClient, HttpHeaders, HttpParams } from ‘@angular/common/http’; import { Observable } from ‘rxjs’; import { ApiResponse, PushApiResponse } from ‘../models’; @Injectable({ providedIn: ‘root’ }) export class PushService { constructor( private http: HttpClient ) { } public postData(data: ApiResponse): Observable<PushApiResponse> { const httpUrl = ‘https://api.loklak.org/ api/push.json’; const headers = new HttpHeaders({ ‘Content-Type’: ‘application/ x-www-form-urlencoded’, ‘Accept’: ‘application/json’, ‘cache-control’: ‘no-cache’ }); const {search_metadata, statuses} = data; // Converting the object to JSON string. const dataToSend = JSON.stringify({ search_metadata: search_metadata, statuses}); // Setting the data to send in // HttpParams() with key as ‘data’ const body = new HttpParams() .set(‘data’, dataToSend); // Making a Post request to api/push.json // endpoint. Response Object is converted // to PushApiResponse type. return this.http.post<PushApiResponse>( httpUrl, body, {headers: headers }); } }   Note: Data (dataToSend) send to backend should be exactly in same format as obtained from server. Pushing data into service dynamically Now the main part is to provide the data to be send into the service. To make it dynamic, import the Push Service in ‘api-search.effects.ts’ file under effects and create the object of Push Service in its constructor. import { PushService } from ‘../services’; constructor( … private pushService: PushService ) { }   Now, call the pushService object inside ‘relocateAfterSearchSuccess$’ effect method and pass the search response data (payload value of search success action) inside Push Service’s postData() method. @Effect() relocateAfterSearchSuccess$: Observable<Action> = this.actions$ .pipe( ofType( apiAction.ActionTypes .SEARCH_COMPLETE_SUCCESS, apiAction.ActionTypes .SEARCH_COMPLETE_FAIL ), withLatestFrom(this.store$), map(([action, state]) => { this.pushService .postData(action[‘payload’]); … ); Testing Successful Push to Backend To test the success of Post request, subscribe to the response data and print the response data on…

Continue ReadingAdding Push endpoint to send data from Loklak Search to Loklak Server

Indexing for multiscrapers in Loklak Server

I recently added multiscraper system which can scrape data from web-scrapers like YoutubeScraper, QuoraScraper, GithubScraper, etc. As scraping is a costly task, it is important to improve it’s efficiency. One of the approach is to index data in cache. TwitterScraper uses multiple sources to optimize the efficiency. This system uses Post message holder object to store data and PostTimeline (a specialized iterator) to iterate the data objects. This difference in data structures from TwitterScraper leads to the need of different approach to implement indexing of data to ElasticSearch (currently in review process). These are the following changes I made while implementing ‘indexing of data’ in the project. 1) Writing of data is invoked only using PostTimeline iterator In TwitterScraper, the data is written in message holder TwitterTweet. So all the tweets are written to index as they are created. Here, when the data is scraped, Writing of the posts is initiated. Scraping of data is considered a heavy process. This approach keeps lower resource usage in average traffic on the server. protected Post putData(Post typeArray, String key, Timeline2 postList) { if(!"cache".equals(this.source)) { postList.writeToIndex(); } return this.putData(typeArray, key, postList.toArray()); } 2) One object for holding a message During the implementation, I kept the same message holder Post and post-iterator PostTimeline from scraping to indexing of data. This helps to keep the structure uniform. Earlier approach involves different types of message wrappers in the way. This approach cuts the processes for looping and transitioning of data structures. 3) Index a list, not a message In TwitterScraper, as the messages are enqueued in the bulk to be indexed. But in this approach, I have enqueued the complete lists. This approach delays the indexing till the scraper is done with processing the html. Creating the queue of postlists: // Add post-lists to queue to be indexed queueClients.incrementAndGet(); try { postQueue.put(postList); } catch (InterruptedException e) { DAO.severe(e); } queueClients.decrementAndGet();   Indexing of the posts in postlists: // Start indexing of data in post-lists for (Timeline2 postList: postBulk) { if (postList.size() < 1) continue; if(postList.dump) { // Dumping of data in a file writeMessageBulkDump(postList); } // Indexing of data to ElasticSearch writeMessageBulkNoDump(postList); }   4) Categorizing the input parameters While searching the index, I have divided the query parameters from scraper into 3 categories. The input parameters are added to those categories (implemented using map data structure) and thus data fetched are according to them. These categories are: // Declaring the QueryBuilder BoolQueryBuilder query = new BoolQueryBuilder();   a) Get the parameter- Get the results for the input fields in map getMap. // Result must have these fields. Acts as AND operator if(getMap != null) { for(Map.Entry<String, String> field : getMap.entrySet()) { query.must(QueryBuilders.termQuery( field.getKey(), field.getValue())); } }   b) Don't get the parameter- Don't get the results for the input fields in map notGetMap. // Result must not have these fields. if(notGetMap != null) { for(Map.Entry<String, String> field : notGetMap.entrySet()) { query.mustNot(QueryBuilders.termQuery( field.getKey(), field.getValue())); } }   c) Get if possible- Get the results with the…

Continue ReadingIndexing for multiscrapers in Loklak Server

Setting Loklak Server with SSL

Loklak Server is based on embedded Jetty Server which can work both with or without SSL encryption. Lately, there was need to setup Loklak Server with SSL. Though the need was satisfied by CloudFlare. Alternatively, there are 2 ways to set up Loklak Server with SSL. They are:- 1) Default Jetty Implementation There is pre-existing implementation of Jetty libraries. The http mode can be set in configuration file. There are 4 modes on which Loklak Server can work: http mode, https mode, only https mode and redirect to https mode. Loklak Server listens to port 9000 when in http mode and to port 9443 when in https mode. There is also a need of SSL certificate which is to be added in configuration file. 2) Getting SSL certificate with Kube-Lego on Kubernetes Deployment I got to know about Kube-Lego by @niranjan94. It is implemented in Open-Event-Orga-Server. The approach is to use: a) Nginx as ingress controller For setting up Nginx ingress controller, a yml file is needed which downloads and configures the server. The configurations for data requests and response are: proxy-connect-timeout: "15" proxy-read-timeout: "600" proxy-send-imeout: "600" hsts-include-subdomains: "false" body-size: "64m" server-name-hash-bucket-size: "256" server-tokens: "false" Nginx is configured to work on both http and https ports in service.yml ports: - port: 80 name: http - port: 443 name: https   b) Kube-Lego for fetching SSL certificates from Let's Encrypt Kube-Lego was set up with default values in yml. It uses the host-name, email address and secretname of the deployment to validate url and fetch SSL certificate from Let's Encrypt. c) Setup configurations related to TLS and no-TLS connection These configuration files mentions the path and service ports for Nginx Server through which requests are forwarded to backend Loklak Server. Here for no-TLS and TLS requests, the requests are directly forwarded to localhost at port 80 of Loklak Server container. rules: - host: staging.loklak.org http: paths: - path: / backend: serviceName: server servicePort: 80 For TLS requests, the secret name is also mentioned. Kube-Lego fetches host-name and secret-name from here for the certificate tls: - hosts: - staging.loklak.org secretName: loklak-api-tls d) Loklak Server, ElasticSearch and Mosquitto at backend These containers work at backend. ElasticSearch and Mosquitto are only accessible to Loklak Server. Loklak Server can be accessed through Nginx server. Loklak Server is configured to work at http mode and is exposed at port 80. ports: - port: 80 protocol: TCP targetPort: 80 To deploy the Loklak Server, all these are deployed in separate pods and they interact through service ports. To deploy, we use deploy script: # For elasticsearch, accessible only to api-server kubectl create -R -f ${path-to-config-file}/elasticsearch/ # For mqtt, accessible only to api-server kubectl create -R -f ${path-to-config-file}/mosquitto/ # Start KubeLego deployment for TLS certificates kubectl create -R -f ${path-to-config-file}/lego/ kubectl create -R -f ${path-to-config-file}/nginx/ # Create web namespace, this acts as bridge to Loklak Server kubectl create -R -f ${path-to-config-file}/web/ # Create API server deployment and expose the services kubectl create -R -f ${path-to-config-file}/api-server/ # Get the…

Continue ReadingSetting Loklak Server with SSL

Introducing Stream Servlet in loklak Server

A major part of my GSoC proposal was adding stream API to loklak server. In a previous blog post, I discussed the addition of Mosquitto as a message broker for MQTT streaming. After testing this service for a few days and some minor improvements, I was in a position to expose the stream to outside users using a simple API. In this blog post, I will be discussing the addition of /api/stream.json endpoint to loklak server. HTTP Server-Sent Events Server-sent events (SSE) is a technology where a browser receives automatic updates from a server via HTTP connection. The Server-Sent Events EventSource API is standardized as part of HTML5 by the W3C. - Wikipedia This API is supported by all major browsers except Microsoft Edge. For loklak, the plan was to use this event system to send messages, as they arrive, to the connected users. Apart from browser support, EventSource API can also be used with many other technologies too. Jetty Eventsource Plugin For Java, we can use Jetty’s EventSource plugin to send events to clients. It is similar to other Jetty servlets when it comes to processing the arguments, handling requests, etc. But it provides a simple interface to send events as they occur to connected users. Adding Dependency To use this plugin, we can add the following line to Gradle dependencies - compile group: 'org.eclipse.jetty', name: 'jetty-eventsource-servlet', version: '1.0.0' [SOURCE] The Event Source An EventSource is the object which is required for EventSourceServlet to send events. All the logics for emitting events needs to be defined in the related class. To link a servlet with an EventSource, we need to override the newEventSource method - public class StreamServlet extends EventSourceServlet { @Override protected EventSource newEventSource(HttpServletRequest request) { String channel = request.getParameter("channel"); if (channel == null) { return null; } if (channel.isEmpty()) { return null; } return new MqttEventSource(channel); } } [SOURCE] If no channel is provided, the EventSource object will be null and the request will be rejected. Here, the MqttEventSource would be used to handle the stream of Tweets as they arrive from the Mosquitto message broker. Cross Site Requests Since the requests to this endpoint can’t be of JSONP type, it is necessary to allow cross site requests on this endpoint. This can be done by overriding the doGet method of the servlet - @Override protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { response.setHeader("Access-Control-Allow-Origin", "*"); super.doGet(request, response); } [SOURCE] Adding MQTT Subscriber When a request for events arrives, the constructor to MqttEventSource is called. At this stage, we need to connect to the stream from Mosquitto for the channel. To achieve this, we can set the class as MqttCallback using appropriate client configurations - public class MqttEventSource implements MqttCallback { ... MqttEventSource(String channel) { this.channel = channel; } ... this.mqttClient = new MqttClient(address, "loklak_server_subscriber"); this.mqttClient.connect(); this.mqttClient.setCallback(this); this.mqttClient.subscribe(this.channel); ... } [SOURCE] By setting the callback to this, we can override the messageArrived method to handle the arrival of a new message on the channel. Just to…

Continue ReadingIntroducing Stream Servlet in loklak Server

Backend Scraping in Loklak Server

Loklak Server is a peer-to-peer Distributed Scraping System. It scrapes data from websites and also maintain other sources like peers, storage and a backend server to scrape data. Maintaining different sources has it's benefits of not engaging in costly requests to the websites, no scraping of data and no cleaning of data. Loklak Server can maintain a secondary Loklak Server (or a backend server) tuned for storing large amount of data. This enables the primary Loklak Server fetch data in return of pushing all scraped data to the backend. Lately there was a bug in backend search as a new feature of filtering tweets was added to scraping and indexing, but not for backend search. To fix this issue, I had backtracked the backend search codebase and fix it. Let us discuss how Backend Search works:- 1) When query is made from search endpoint with: a) source=all When source is set to all. The first TwitterScraper and Messages from local search server is preferred. If the messages scraped are not enough or no output has been returned for a specific amount of time, then, backend search is initiated b) source=backend SearchServlet specifically scrapes directly from backend server. 2) Fetching data from Backend Server The input parameters fetched from the client is feeded into DAO.searchBackend method. The list of backend servers fetched from config file. Now using these input parameters and backend servers, the required data is scraped and output to the client. In DAO.searchOnOtherPeers method. the request is sent to multiple servers and they are arranged in order of better response rates. This method invokes SearchServlet.search method for sending request to the mentioned servers. List<String> remote = getBackendPeers(); if (remote.size() > 0) { // condition deactivated because we need always at least one peer Timeline tt = searchOnOtherPeers(remote, q, filterList, order, count, timezoneOffset, where, SearchServlet.backend_hash, timeout); if (tt != null) tt.writeToIndex(); return tt; }   3) Creation of request url and sending requests The request url is created according to the input parameters passed to SearchServlet.search method. Here the search url is created according to input parameters and request is sent to the respective servers to fetch the required messages. // URL creation urlstring = protocolhostportstub + "/api/search.json?q=" + URLEncoder.encode(query.replace(' ', '+'), "UTF-8") + "&timezoneOffset=" + timezoneOffset + "&maximumRecords=" + count + "&source=" + (source == null ? "all" : source) + "&minified=true&shortlink=false&timeout=" + timeout; if(!"".equals(filterString = String.join(", ", filterList))) { urlstring = urlstring + "&filter=" + filterString; } // Download data byte[] jsonb = ClientConnection.downloadPeer(urlstring); if (jsonb == null || jsonb.length == 0) throw new IOException("empty content from " + protocolhostportstub); String jsons = UTF8.String(jsonb); JSONObject json = new JSONObject(jsons); if (json == null || json.length() == 0) return tl; // Final data fetched to be returned JSONArray statuses = json.getJSONArray("statuses"); References Social peer-to-peer processes: https://en.wikipedia.org/wiki/Social_peer-to-peer_processes Parallel Random-Access Machine:  http://pages.cs.wisc.edu/~tvrdik/2/html/Section2.html Distributed Algorithm (Cole–Vishkin algorithm): http://homepage.divms.uiowa.edu/~ghosh/color.pdf

Continue ReadingBackend Scraping in Loklak Server

Configuring Youtube Scraper with Search Endpoint in Loklak Server

Youtube Scraper is one of the interesting web scrapers of Loklak Server with unique implementation of its data scraping and data key creation (using RDF). It couldn't be accessed as it didn't have any url endpoint. I configured it to use both as separate endpoint (api/youtubescraper) and search endpoint (/api/search.json). Usage: YoutubeScraper Endpoint: /api/youtubescraperExample:http://api.loklak.org/api/youtubescraper?query=https://www.youtube.com/watch?v=xZ-m55K3FhQ&scraper=youtube SearchServlet Endpoint: /api/search.json Example: http://api.loklak.org/api/search.json?query=https://www.youtube.com/watch?v=xZ-m55K3FhQ&scraper=youtube The configurations added in Loklak Server are:- 1) Endpoint We can access YoutubeScraper using endpoint /api/youtubescraper endpoint. Like other scrapers, I have used BaseScraper class as superclass for this functionality . 2) PrepareSearchUrl The prepareSearchUrl method creates youtube search url that is used to scrape Youtube webpage. YoutubeScraper takes url as input. But youtube link could also be a shortened link. That is why, the video id is stored as query. This approach optimizes the scraper and adds the capability to add more scrapers to it. Currently YoutubeScraper scrapes the video webpages of Youtube, but scrapers for search webpage and channel webpages can also be added. URIBuilder url = null; String midUrl = "search/"; try { switch(type) { case "search": midUrl = "search/"; url = new URIBuilder(this.baseUrl + midUrl); url.addParameter("search_query", this.query); break; case "video": midUrl = "watch/"; url = new URIBuilder(this.baseUrl + midUrl); url.addParameter("v", this.query); break; case "user": midUrl = "channel/"; url = new URIBuilder(this.baseUrl + midUrl + this.query); break; default: url = new URIBuilder(""); break; } } catch (URISyntaxException e) { DAO.log("Invalid Url: baseUrl = " + this.baseUrl + ", mid-URL = " + midUrl + "query = " + this.query + "type = " + type); return ""; }   3) Get-Data-From-Connection The getDataFromConnection method is used to fetch Bufferedreader object and input it to scrape method. In YoutubeScraper, this method has been overrided to prevent using default method implementation i.e. use type=all @Override public Post getDataFromConnection() throws IOException { String url = this.prepareSearchUrl(this.type); return getDataFromConnection(url, this.type); }   4) Set scraper parameters input as get-parameters The Map data-structure of get-parameters fetched by scraper fetches type and query. For URL, the video hash-code is separated from url and then used as query. this.query = this.getExtraValue("query"); this.query = this.query.substring(this.query.length() - 11);   5) Scrape Method Scrape method runs the different scraper methods (in YoutubeScraper, there is only one), iterate it using PostTimeline and wraps in Post object to the output. This simple function can improve flexibility of scraper to scrape different pages concurrently. Post out = new Post(true); Timeline2 postList = new Timeline2(this.order); postList.addPost(this.parseVideo(br, type, url)); out.put("videos", postList.toArray());   References What is an RDF triple explained on Stackoverflow: https://stackoverflow.com/questions/273218/whats-a-rdf-triple Tutorial on Scraping with Regular Expressions: http://stanford.edu/~mgorkove/cgi-bin/rpython_tutorials/Scraping_PDFsText_Files_in_Python_Using_Regular_Expressions.php Youtube Video-Id Format: https://webapps.stackexchange.com/questions/54443/format-for-id-of-youtube-video

Continue ReadingConfiguring Youtube Scraper with Search Endpoint in Loklak Server

Optimising Docker Images for loklak Server

The loklak server is in a process of moving to Kubernetes. In order to do so, we needed to have different Docker images that suit these deployments. In this blog post, I will be discussing the process through which I optimised the size of Docker image for these deployments. Initial Image The image that I started with used Ubuntu as base. It installed all the components needed and then modified the configurations as required - FROM ubuntu:latest # Env Vars ENV LANG=en_US.UTF-8 ENV JAVA_TOOL_OPTIONS=-Dfile.encoding=UTF8 ENV DEBIAN_FRONTEND noninteractive WORKDIR /loklak_server RUN apt-get update RUN apt-get upgrade -y RUN apt-get install -y git openjdk-8-jdk RUN git clone https://github.com/loklak/loklak_server.git /loklak_server RUN git checkout development RUN ./gradlew build -x test -x checkstyleTest -x checkstyleMain -x jacocoTestReport RUN sed -i.bak 's/^\(port.http=\).*/\180/' conf/config.properties ... # More configurations RUN echo "while true; do sleep 10;done" >> bin/start.sh # Start CMD ["bin/start.sh", "-Idn"] The size of images built using this Dockerfile was quite huge - REPOSITORY          TAG                 IMAGE ID            CREATED              SIZE loklak_server       latest              a92f506b360d        About a minute ago   1.114 GB ubuntu              latest              ccc7a11d65b1        3 days ago           120.1 MB But since this size is not acceptable, we needed to reduce it. Moving to Apline Alpine Linux is an extremely lightweight Linux distro, built mainly for the container environment. Its size is so tiny that it hardly puts any impact on the overall size of images. So, I replaced Ubuntu with Alpine - FROM alpine:latest ... RUN apk update RUN apk add git openjdk8 bash ... And now we had much smaller images - REPOSITORY          TAG                 IMAGE ID            CREATED             SIZE loklak_server       latest              54b507ee9187        17 seconds ago      668.8 MB alpine              latest              7328f6f8b418        6 weeks ago         3.966 MB As we can see that due to no caching and small size of Alpine, the image size is reduced to almost half the original. Reducing Content Size There are many things in a project which are no longer needed while running the project, like the .git folder (which is huge in case of loklak) - $ du -sh loklak_server/.git 236M loklak_server/.git We can remove such files from the Docker image and save a lot of space - rm -rf .[^.] .??* Optimizing Number of Layers The number of layers also affect the size of the image. More the number of layers, more will be the size of image. In the Dockerfile, we can club together the RUN commands for lower number of images. RUN apk update && apk add openjdk8 git bash && \ git clone https://github.com/loklak/loklak_server.git /loklak_server && \ ... After this, the effective size is again reduced by a major factor - REPOSITORY          TAG                 IMAGE ID            CREATED             SIZE loklak_server       latest              54b507ee9187        17 seconds ago      422.3 MB alpine              latest              7328f6f8b418        6 weeks ago         3.966 MB Conclusion In this blog post, I discussed the process of optimising the size of Dockerfile for Kubernetes deployments of loklak server. The size was reduced to 426 MB from 1.234 GB and this provided much faster push/pull time for Docker images, and therefore, faster updates for Kubernetes deployments.…

Continue ReadingOptimising Docker Images for loklak Server

Persistently Storing loklak Server Dumps on Kubernetes

In an earlier blog post, I discussed loklak setup on Kubernetes. The deployment mentioned in the post was to test the development branch. Next, we needed to have a deployment where all the messages are collected and dumped in text files that can be reused. In this blog post, I will be discussing the challenges with such deployment and the approach to tackle them. Volatile Disk in Kubernetes The pods that hold deployments in Kubernetes have disk storage. Any data that gets written by the application stays only until the same version of deployment is running. As soon as the deployment is updated/relocated, the data stored during the application is cleaned up. Due to this, dumps are written when loklak is running but they get wiped out when the deployment image is updated. In other words, all dumps are lost when the image updates. We needed to find a solution to this as we needed a permanent storage when collecting dumps. Persistent Disk In order to have a storage which can hold data permanently, we can mount persistent disk(s) on a pod at the appropriate location. This ensures that the data that is important to us stays with us, even when the deployment goes down. In order to add persistent disks, we first need to create a persistent disk. On Google Cloud Platform, we can use the gcloud CLI to create disks in a given region - gcloud compute disks create --size=<required size> --zone=<same as cluster zone> <unique disk name> After this, we can mount it on a Docker volume defined in Kubernetes configurations - ... volumeMounts: - mountPath: /path/to/mount name: volume-name volumes: - name: volume-name gcePersistentDisk: pdName: disk-name fsType: fileSystemType But this setup can’t be used for storing loklak dumps. Let’s see “why” in the next section. Rolling Updates and Persistent Disk The Kubernetes deployment needs to be updated when the master branch of loklak server is updated. This update of master deployment would create a new pod and try to start loklak server on it. During all this, the older deployment would also be running and serving the requests. The control will not be transferred to the newer pod until it is ready and all the probes are passing. The newer deployment will now try to mount the disk which is mentioned in the configuration, but it would fail to do so. This would happen because the older pod has already mounted the disk. Therefore, all new deployments would simply fail to start due to insufficient resources. To overcome such issues, Kubernetes allows persistent volume claims. Let’s see how we used them for loklak deployment. Persistent Volume Claims Kubernetes provides Persistent Volume Claims which claim resources (storage) from a Persistent Volume (just like a pod does from a node). The higher level APIs are provided by Kubernetes (configurations and kubectl command line). In the loklak deployment, the persistent volume is a Google Compute Engine disk - apiVersion: v1 kind: PersistentVolume metadata: name: dump namespace: web spec: capacity: storage:…

Continue ReadingPersistently Storing loklak Server Dumps on Kubernetes

Using Mosquitto as a Message Broker for MQTT in loklak Server

In loklak server, messages are collected from various sources and indexed using Elasticsearch. To know when a message of interest arrives, users can poll the search endpoint. But this method would require a lot of HTTP requests, most of them being redundant. Also, if a user would like to collect messages for a particular topic, he would need to make a lot of requests over a period of time to get enough data. For GSoC 2017, my proposal was to introduce stream API in the loklak server so that we could save ourselves from making too many requests and also add many use cases. Mosquitto is Eclipse’s project which acts as a message broker for the popular MQTT protocol. MQTT, based on the pub-sub model, is a lightweight and IOT friendly protocol. In this blog post, I will discuss the basic setup of Mosquitto in the loklak server. Installation and Dependency for Mosquitto The installation process of Mosquitto is very simple. For Ubuntu, it is available from the pre installed PPAs - sudo apt-get install mosquitto Once the message broker is up and running, we can use the clients to connect to it and publish/subscribe to channels. To add MQTT client as a project dependency, we can introduce following line in Gradle dependencies file - compile group: 'net.sf.xenqtt', name: 'xenqtt', version: '0.9.5' [SOURCE] After this, we can use the client libraries in the server code base. The MQTTPublisher Class The MQTTPublisher class in loklak would provide an interface to perform basic operations in MQTT. The implementation uses AsyncClientListener to connect to Mosquitto broker - AsyncClientListener listener = new AsyncClientListener() { // Override methods according to needs }; [SOURCE] The publish method for the class can be used by other components of the project to publish messages on the desired channel - public void publish(String channel, String message) { this.mqttClient.publish(new PublishMessage(channel, QoS.AT_LEAST_ONCE, message)); } [SOURCE] We also have methods which allow publishing of multiple messages to multiple channels in order to increase the functionality of the class. Starting Publisher with Server The flags which signal using of streaming service in loklak are located in conf/config.properties. These configurations are referred while initializing the Data Access Object and an MQTTPublisher is created if needed - String mqttAddress = getConfig("stream.mqtt.address", "tcp://127.0.0.1:1883"); streamEnabled = getConfig("stream.enabled", false); if (streamEnabled) { mqttPublisher = new MQTTPublisher(mqttAddress); } [SOURCE] The mqttPublisher can now be used by other components of loklak to publish messages to the channel they want. Adding Mosquitto to Kubernetes Since loklak has also a nice Kubernetes setup, it was very simple to introduce a new deployment for Mosquitto to it. Changes in Dockerfile The Dockerfile for master deployment has to be modified to discover Mosquitto broker in the Kubernetes cluster. For this purpose, corresponding flags in config.properties have to be changed to ensure that things work fine - sed -i.bak 's/^\(stream.enabled\).*/\1=true/' conf/config.properties && \ sed -i.bak 's/^\(stream.mqtt.address\).*/\1=mosquitto.mqtt:1883/' conf/config.properties && \ [SOURCE] The Mosquitto broker would be available at mosquitto.mqtt:1883 because of the service that is created…

Continue ReadingUsing Mosquitto as a Message Broker for MQTT in loklak Server