Integrating YaCy Grid Locally with Susper

The YaCy Grid is the second-generation implementation of YaCy, a peer-to-peer search engine.The search results can be improved to a great extent by using YaCy-Grid as the new backend for SUSPER. YaCy Grid is the best choice for distributed search topology. The legacy YaCy is made for decentralised and also distributed network. While both the networks are distributed,the YaCy-Grid is centralized and legacy YaCy is decentralized. YaCy Grid facilitates a lot with scaling that will be in our hand and can be done in all aspects​(loading, parsing, indexing) with computing power we choose. In YaCy,Solr is embedded. But in YaCy Grid,we will get elasticsearch cluster.​They are both built around the core underlying search library Lucene.But ​elasticsearch will help us to scale almost indefinitely. In this blog, I will show you how to integrate YaCy Grid with Susper locally and how to use it to fetch results.

Implementing YaCy Grid with Susper:

Before using YaCy Grid we need to first setup YaCy Grid and crawl to url using crawl start API, more information about that can be found here Implementing YaCy Grid with Susper and Setting up YaCy Grid locally.

So, once we are done with setup and crawling, we need to begin using its APIs in Susper. Following are some easy steps in which we can show results from YaCy Grid in a separate tab is Susper.

Step 1:

Creating a service to fetch results:

In order to fetch results from local YaCy Grid server we need to create a service to fetch results from local YaCy Grid server. Here is the class in grid-service.ts which fetches results for us.

export class GridSearchService {
 server = '';
 searchURL = this.server + '/yacy/grid/mcp/index/yacysearch.json?query=';
 constructor(private http: Http,
             private jsonp: Jsonp,
             private store: Store<fromRoot.State>) {
 getSearchResults(searchquery) { 
   return this.http
     .get(this.searchURL+searchquery).map(res =>


Step 2:

Modifying results.component.ts file

In order to get results from grid-service.ts in results.component.ts we must need to create an instance of the service and use this instance to get the results and store it in variables results.component.ts file and then use these variables to show results in results template. Following is the code that does this for us

ngOnInit() {


   this.resultDisplay = 'grid';
   this.gridmessage='About ' + this.totalgridresults + ' results';


Step 3:

Creating a New tab to show results from YaCy Grid:

Now we need to create a tab in the template where we can use local variables in results.component.ts to show the results following the current design pattern here is the code for that

<li [class.active_view]="Display('grid')" (click)="gridClick()">YaCy_Grid</li>

<!--YaCy Grid-->
 <div class="container-fluid">
     <div class="result message-bar" *ngIf="totalgridresults > 0 && Display('grid')">
     <div class="autocorrect">
       <app-auto-correct [hidden]="hideAutoCorrect"></app-auto-correct>
 <div class="grid-result" *ngIf="Display('grid')">
   <div class="feed container">
       <div *ngFor="let item of gridItems" class="result">
         <div class="title">
           <a class="title-pointer" href="{{}}" [style.color]="themeService.titleColor">{{item.title}}</a>
         <div class="link">
           <p [style.color]="themeService.linkColor">{{}}</p>
         <div class="description">
           <p [style.color]="themeService.descriptionColor">{{item.pubDate|date:'MMMM d, yyyy'}} - {{item.description}}</p>
 <!-- END -->


Step 4:

Starting YaCy Grid Locally:

Now all we need is to start YaCy Grid server locally. To start it go in yacy_grid_mcp folder and use

python bin/


This will start elasticsearch from its respective script.Next use

python bin/


This will start RabbitMQ server with the required configuration.Next useThis will start elasticsearch from its respective script.Next use

gradle run


To start YaCy Grid locally.

Now we are all done we just need to start Susper using

ng serve


command and type a search query and move to YaCy_Grid tab to see results from YaCy Grid Server.

Here is the image which shows results from YaCy Grid in Susper


Continue ReadingIntegrating YaCy Grid Locally with Susper

Setting up YaCy Grid locally

SUSPER is a search interface that uses P2P search engine YaCy . Search results are displayed using Solr server which is embedded into YaCy. The retrieval of search results is done using YaCy search API. When a search request is made in one of the search templates, an HTTP request is made to YaCy and the response is done in JSON. In this blog post I will show how to setup YaCy Grid locally.

What is YaCy Grid ?

The YaCy Grid is the second-generation implementation of YaCy, a peer-to-peer search engine. The required storage functions of the YaCy Grid are:

  1.  An asset storage, basically a file sharing environment for YaCy components,an ftp server is used for asset storage.
  2.  A message system providing an Enterprise Integration Framework using a message-oriented middleware,RabbitMQ message queues for the message system.
  3.  A database system providing search-engine related retrieval functions.It uses Elasticsearch for database operations.

How to setup YaCy Grid locally ?

YaCy Grid have 4 components MCP(Master Connect Program), Loader, Crawler and  Parser.

  1. Clone all the components using –recursive flag.

git clone --recursive
git clone --recursive
git clone --recursive
git clone --recursive
  1.  Now to starting YaCy Grid requires starting Elasticsearch, RabbitMQ with Username `anonymous` and Password `yacy` and an ftp server(it can be omitted as MCP can take over).
  2.  All the above steps can also be done in a single step by running a python script in `bin` folder ``
  3.  Working of `` in yacy_grid_mcp:

if not checkportopen(9200):
   print "Elasticsearch is not running"
   elasticversion = 'elasticsearch-5.6.5'
   if not os.path.isfile(path_apphome + '/data/mcp-8100/apps/' + elasticversion + '.tar.gz'):
       print('Downloading ' + elasticversion)
       urllib.urlretrieve ('' + elasticversion + '.tar.gz', path_apphome + '/data/mcp-8100/apps/' + elasticversion + '.tar.gz')
   if not os.path.isdir(path_apphome + '/data/mcp-8100/apps/elasticsearch'):
       print('Decompressing' + elasticversion)
       os.system('tar xfz ' + path_apphome + '/data/mcp-8100/apps/' + elasticversion + '.tar.gz -C ' + path_apphome + '/data/mcp-8100/apps/')
       os.rename(path_apphome + '/data/mcp-8100/apps/' + elasticversion, path_apphome + '/data/mcp-8100/apps/elasticsearch')
   # run elasticsearch
   print('Running Elasticsearch')
   os.chdir(path_apphome + '/data/mcp-8100/apps/elasticsearch/bin')
   os.system('nohup ./elasticsearch &')


  • Checks whether Elasticsearch is running or not, if not then runs Elasticsearch.
if checkportopen(15672):
   print "RabbitMQ is Running"
   print "If you have configured it according to YaCy setup press N"
   print "If you have not configured it according to YaCy setup or Do not know what to do press Y"
   if(n=='Y' or n=='y'):
       os.system('service rabbitmq-server stop')
if not checkportopen(15672):
   print "rabbitmq is not running"
   os.system('python bin/')
  • Checks whether RabbitMQ is running or not, if yes then asks user to configure it according to YaCy Grid setup by pressing Y or else ignore,if not then starts RabbitMQ according to required configuration.'bin/')
  • .Updates all the Grid components including MCP.

if not checkportopen(2121):
   print "ftp server is not Running"
  • Checks for an ftp server and prints message accordingly.

def run_mcp():['gnome-terminal', '-e', "gradle run"])

def run_loader():
   os.system('cd ../yacy_grid_loader')['gnome-terminal', '-e', "gradle run"])

def run_crawler():
   os.system('cd ../yacy_grid_crawler')['gnome-terminal', '-e', "gradle run"])

def run_parser():
   os.system('cd ../yacy_grid_parser')['gnome-terminal', '-e', "gradle run"])


  • Runs all components of YaCy Grid in separate terminal.

Once user starts it, then he can start using YaCy Grid through terminal.

If a YaCy Grid service has used the MCP once, it learns from the MCP to connect to the infrastructure itself. For example:

  • a YaCy Grid service starts up and connects to the MCP
  • the Grid service pushes a message to the message queue using the MCP
  • the MCP fulfils the message send operation and response with the actual address of the message broker
  • the YaCy Grid service learns the direct connection information
  • whenever the YaCy Grid service wants to connect to the message broker again, it can do so using a direct broker connection. This process is done transparently, the Grid service does not need to handle such communication details itself. The routing is done automatically. To use the MCP inside other grid components the git submodule functionality is used.


Continue ReadingSetting up YaCy Grid locally

Open Event Server: Creating/Rebuilding Elasticsearch Index From Existing Data In a PostgreSQL DB Using Python

The Elasticsearch instance in the current Open Event Server deployment is currently just used to store the events and search through it due to limited resources.

The project uses a PostgreSQL database, this blog will focus on setting up a job to create the events index if it does not exist. If the indices exists, the job will delete all the previous the data and rebuild the events index.

Although the project uses Flask framework, the job will be in pure python so that it can run in background properly while the application continues its work. Celery is used for queueing up the aforementioned jobs. For building the job the first step would be to connect to our database:

from config import Config
import psycopg2
conn = psycopg2.connect(Config.SQLALCHEMY_DATABASE_URI)
cur = conn.cursor()


The next step would be to fetch all the events from the database. We will only be indexing certain attributes of the event which will be useful in search. Rest of them are not stored in the index. The code given below will fetch us a collection of tuples containing the attributes mentioned in the code:

       "SELECT id, name, description, searchable_location_name, organizer_name, organizer_description FROM events WHERE state = 'published' and deleted_at is NULL ;")
   events = cur.fetchall()


We will be using the the bulk API, which is significantly fast as compared to adding an event one by one via the API. Elasticsearch-py, the official python client for elasticsearch provides the necessary functionality to work with the bulk API of elasticsearch. The helpers present in the client enable us to use generator expressions to insert the data via the bulk API. The generator expression for events will be as follows:

event_data = ({'_type': 'event',
                  '_index': 'events',
                  '_id': event_[0],
                  'name': event_[1],
                  'description': event_[2] or None,
                  'searchable_location_name': event_[3] or None,
                  'organizer_name': event_[4] or None,
                  'organizer_description': event_[5] or None}
                 for event_ in events)


We will now delete the events index if it exists. The the event index will be recreated. The generator expression obtained above will be passed to the bulk API helper and the event index will repopulated. The complete code for the function will now be as follows:


def cron_rebuild_events_elasticsearch():
   Re-inserts all eligible events into elasticsearch
   conn = psycopg2.connect(Config.SQLALCHEMY_DATABASE_URI)
   cur = conn.cursor()
       "SELECT id, name, description, searchable_location_name, organizer_name, organizer_description FROM events WHERE state = 'published' and deleted_at is NULL ;")
   events = cur.fetchall()
   event_data = ({'_type': 'event',
                  '_index': 'events',
                  '_id': event_[0],
                  'name': event_[1],
                  'description': event_[2] or None,
                  'searchable_location_name': event_[3] or None,
                  'organizer_name': event_[4] or None,
                  'organizer_description': event_[5] or None}
                 for event_ in events)
   abc = helpers.bulk(es_store, event_data)


Currently we run this job on each week and also on each new deployment. Rebuilding the index is very important as some records may not be indexed when the continuous sync is taking place.

To know more about it please visit

Related links:

Continue ReadingOpen Event Server: Creating/Rebuilding Elasticsearch Index From Existing Data In a PostgreSQL DB Using Python

Indexing for multiscrapers in Loklak Server

I recently added multiscraper system which can scrape data from web-scrapers like YoutubeScraper, QuoraScraper, GithubScraper, etc. As scraping is a costly task, it is important to improve it’s efficiency. One of the approach is to index data in cache. TwitterScraper uses multiple sources to optimize the efficiency.

This system uses Post message holder object to store data and PostTimeline (a specialized iterator) to iterate the data objects. This difference in data structures from TwitterScraper leads to the need of different approach to implement indexing of data to ElasticSearch (currently in review process).

These are the following changes I made while implementing ‘indexing of data’ in the project.

1) Writing of data is invoked only using PostTimeline iterator

In TwitterScraper, the data is written in message holder TwitterTweet. So all the tweets are written to index as they are created. Here, when the data is scraped, Writing of the posts is initiated. Scraping of data is considered a heavy process. This approach keeps lower resource usage in average traffic on the server.

protected Post putData(Post typeArray, String key, Timeline2 postList) {
   if(!"cache".equals(this.source)) {
   return this.putData(typeArray, key, postList.toArray());

2) One object for holding a message

During the implementation, I kept the same message holder Post and post-iterator PostTimeline from scraping to indexing of data. This helps to keep the structure uniform. Earlier approach involves different types of message wrappers in the way. This approach cuts the processes for looping and transitioning of data structures.

3) Index a list, not a message

In TwitterScraper, as the messages are enqueued in the bulk to be indexed. But in this approach, I have enqueued the complete lists. This approach delays the indexing till the scraper is done with processing the html.

Creating the queue of postlists:

// Add post-lists to queue to be indexed
try {
} catch (InterruptedException e) {


Indexing of the posts in postlists:

// Start indexing of data in post-lists
for (Timeline2 postList: postBulk) {
    if (postList.size() < 1) continue;
    if(postList.dump) {
        // Dumping of data in a file
    // Indexing of data to ElasticSearch


4) Categorizing the input parameters

While searching the index, I have divided the query parameters from scraper into 3 categories. The input parameters are added to those categories (implemented using map data structure) and thus data fetched are according to them. These categories are:

// Declaring the QueryBuilder
BoolQueryBuilder query = new BoolQueryBuilder();


a) Get the parameter– Get the results for the input fields in map getMap.

// Result must have these fields. Acts as AND operator
if(getMap != null) {
    for(Map.Entry<String, String> field : getMap.entrySet()) {
field.getKey(), field.getValue()));


b) Don’t get the parameter- Don’t get the results for the input fields in map notGetMap.

// Result must not have these fields.
if(notGetMap != null) {
    for(Map.Entry<String, String> field : notGetMap.entrySet()) {
                field.getKey(), field.getValue()));


c) Get if possible- Get the results with the input fields if they are present in the index.

// Result may preferably also get these fields. Acts as OR operator
if(mayAlsoGetMap != null) {
    for(Map.Entry<String, String> field : mayAlsoGetMap.entrySet()) {
                field.getKey(), field.getValue()));



By applying these changes, the scrapers are shifted from a message indexing to list of messages indexing. This way we are keeping load on RAM low, but the aggregation of latest scraped data may be affected. So there will be a need to workaround to solve this issue while scraping itself.


Continue ReadingIndexing for multiscrapers in Loklak Server

Creating A Dockerfile For Yacy Grid MCP

The YaCy Grid is the second-generation implementation of YaCy, a peer-to-peer search engine. A YaCy Grid installation consists of a set of micro-services which communicate with each other using a common infrastructure for data persistence. The task was to deploy the second-generation of YaCy Grid. To do so, we first had created a Dockerfile. This dockerfile should start the micro services such as rabbitmq, Apache ftp and elasticsearch in one docker instance along with MCP. The microservices perform following tasks:

  • Apache ftp server for asset storage.
  • RabbitMQ message queues for the message system.
  • Elasticsearch for database operations.

To launch these microservices using Dockerfile, we referred to following documentations regarding running these services locally:

For creating a Dockerfile we proceeded as follows:

FROM ubuntu:latest
MAINTAINER Harshit Prasad# Update
RUN apt-get update
RUN apt-get upgrade -y# add packages
# install jdk package for java
RUN apt-get install -y git openjdk-8-jdk

#install gradle required for build
RUN apt-get update && apt-get install -y software-properties-common
RUN add-apt-repository ppa:cwchien/gradle
RUN apt-get update
RUN apt-get install -y wget
RUN wget
RUN mkdir /opt/gradle
RUN apt-get install -y unzip
RUN unzip -d /opt/gradle
RUN PATH=$PATH:/opt/gradle/gradle-3.4.1/bin
ENV GRADLE_HOME=/opt/gradle/gradle-3.4.1
RUN gradle -v

# install apache ftp server 1.1.0
RUN wget
RUN tar xfz apache-ftpserver-1.1.0.tar.gz

# install RabbitMQ server
RUN wget
RUN tar xf rabbitmq-server-generic-unix-3.6.6.tar.xz

# install erlang language for RabbitMQ
RUN apt-get install -y erlang

# install elasticsearch
RUN wget
RUN sha1sum elasticsearch-5.5.0.tar.gz
RUN tar -xzf elasticsearch-5.5.0.tar.gz

# clone yacy_grid_mcp repository
RUN git clone
WORKDIR /yacy_grid_mcp

RUN cat docker/ > ../apacheftpserver1.1.0/res/conf/

# compile
RUN gradle build
RUN mkdir data/mcp-8100/conf/ -p
RUN cp docker/ data/mcp-8100/conf/
RUN chmod +x ./docker/

# Expose web interface ports
# 2121: ftp, a FTP server to be used for mass data / file storage
# 5672: rabbitmq, a rabbitmq message queue server to be used for global messages, queues and stacks
# 9300: elastic, an elasticsearch server or main cluster address for global database storage
EXPOSE 2121 5672 9300 9200 15672 8100

# Define default command.
ENTRYPOINT [“/bin/bash”, “./docker/”]


We have created a file to start RabbitMQ and Apache FTP services. At the end, for compilation gradle run will be executed.

adduser –disabled-password –gecos ” r
adduser r sudo
echo ‘%sudo ALL=(ALL) NOPASSWD:ALL’ >> /etc/sudoers
chmod a+rwx /elasticsearch-5.5.0 -R
su -m r -c ‘/elasticsearch-5.5.0/bin/elasticsearch &’
cd /apacheftpserver1.1.0
./bin/ res/conf/ftpdtypical.xml &
/rabbitmq_server-3.6.6/sbin/rabbitmq-server -detached
sleep 5s;
/rabbitmq_server-3.6.6/sbin/rabbitmq-plugins enable rabbitmq_management
/rabbitmq_server3.6.6/sbin/rabbitmqctl add_user yacygrid password4account
echo [{rabbit, [{loopback_users, []}]}]. >> /rabbitmq_server-3.6.6/etc/rabbitmq/rabbitmq.config
/rabbitmq_server-3.6.6/sbin/rabbitmqctl set_permissions -p / yacygrid “.*” “.*” “.*”
cd /yacy_grid_mcp
sleep 5s;
gradle run will first add username and then password. Then it will start RabbitMQ along with Apache FTP.  For username and password, we have created a separate files to configure their properties during Docker run which can be found here:

The logic behind running all the microservices in one docker instance was: creating each container for microservice and then link those containers with the help of docker-compose.yml file.

The Dockerfile which we have created was corresponding to one image. Another image was elasticsearch which was linked to this Dockerfile. The latest version of elasticsearch image was already available on their site:

We configured the docker-compose.yml file according to the reference link provided above. The docker-compose file can be found here:

The source code for the implementation of whole structure can be found here:



Continue ReadingCreating A Dockerfile For Yacy Grid MCP

Deploying loklak Server on Kubernetes with External Elasticsearch

Kubernetes is an open-source system for automating deployment, scaling, and management of containerized applications.

Kubernetes is an awesome cloud platform, which ensures that cloud applications run reliably. It runs automated tests, flawless updates, smart roll out and rollbacks, simple scaling and a lot more.

So as a part of GSoC, I worked on taking the loklak server to Kubernetes on Google Cloud Platform. In this blog post, I will be discussing the approach followed to deploy development branch of loklak on Kubernetes.

New Docker Image

Since Kubernetes deployments work on Docker images, we needed one for the loklak project. The existing image would not be up to the mark for Kubernetes as it contained the declaration of volumes and exposing of ports. So I wrote a new Docker image which could be used in Kubernetes.

The image would simply clone loklak server, build the project and trigger the server as CMD

FROM alpine:latest


WORKDIR /loklak_server

RUN apk update && apk add openjdk8 git bash && \
    git clone /loklak_server && \
    git checkout development && \
    ./gradlew build -x test -x checkstyleTest -x checkstyleMain -x jacocoTestReport && \
    # Some Configurations and Cleanups

CMD ["bin/", "-Idn"]


This image wouldn’t have any volumes or exposed ports and we are now free to configure them in the configuration files (discussed in a later section).

Building and Pushing Docker Image using Travis

To automatically build and push on a commit to the master branch, Travis build is used. In the after_success section, a call to push Docker image is made.

Travis environment variables hold the username and password for Docker hub and are used for logging in –



We needed checks there to ensure that we are on the right branch for the push and we are not handling a pull request –

# Build and push Kubernetes Docker image
if [ "$TRAVIS_BRANCH" == "development" ]; then
    docker build -t loklak_server_kubernetes kubernetes/images/development
    docker tag loklak_server_kubernetes $KUBERNETES_BRANCH
    docker push $KUBERNETES_BRANCH
    docker push $KUBERNETES_COMMIT
elif [ "$TRAVIS_BRANCH" == "master" ]; then
    # Build and push master
    echo "Skipping Kubernetes image push for branch $TRAVIS_BRANCH"


Kubernetes Configurations for loklak

Kubernetes cluster can completely be configured using configurations written in YAML format. The deployment of loklak uses the previously built image. Initially, the image tagged as latest-kubernetes-development is used –

apiVersion: apps/v1beta1
kind: Deployment
  name: server
  namespace: web
  replicas: 1
        app: server
      - name: server
        image: loklak/loklak_server:latest-kubernetes-development


Readiness and Liveness Probes

Probes act as the top level tester for the health of a deployment in Kubernetes. The probes are performed periodically to ensure that things are working fine and appropriate steps are taken if they fail.

When a new image is updated, the older pod still runs and servers the requests. It is replaced by the new ones only when the probes are successful, otherwise, the update is rolled back.

In loklak, the /api/status.json endpoint gives information about status of deployment and hence is a good target for probes –

    path: /api/status.json
    port: 80
  initialDelaySeconds: 30
  timeoutSeconds: 3
    path: /api/status.json
    port: 80
  initialDelaySeconds: 30
  timeoutSeconds: 3


These probes are performed periodically and the server is restarted if they fail (non-success HTTP status code or takes more than 3 seconds).

Ports and Volumes

In the configurations, port 80 is exposed as this is where Jetty serves inside loklak –

- containerPort: 80
  protocol: TCP


If we notice, this is the port that we used for running the probes. Since the development branch deployment holds no dumps, we didn’t need to specify any explicit volumes for persistence.

Load Balancer Service

While creating the configurations, a new public IP is assigned to the deployment using Google Cloud Platform’s load balancer. It starts listening on port 80 –

- containerPort: 80
  protocol: TCP


Since this service creates a new public IP, it is recommended not to replace/recreate this services as this would result in the creation of new public IP. Other components can be updated individually.

Kubernetes Configurations for Elasticsearch

To maintain a persistent index, this deployment would require an external Elasticsearch cluster. loklak is able to connect itself to external Elasticsearch cluster by changing a few configurations.

Docker Image and Environment Variables

The image used for Elasticsearch is taken from pires/docker-elasticsearch-kubernetes. It allows easy configuration of properties from environment variables in configurations. Here is a list of configurable variables, but we needed just a few of them to do our task –

  value: /var/run/secrets/
      fieldPath: metadata.namespace
- name: "CLUSTER_NAME"
  value: "loklakcluster"
  value: "elasticsearch"
  value: "true"
- name: NODE_DATA
  value: "true"
  value: "true"


Persistent Index using Persistent Cloud Disk

To make the index last even after the deployment is stopped, we needed a stable place where we could store all that data. Here, Google Compute Engine’s standard persistent disk was used. The disk can be created using GCP web portal or the gcloud CLI.

Before attaching the disk, we need to declare a volume where we could mount it –

- mountPath: /data
  name: storage


Now that we have a volume, we can simply mount the persistent disk on it –

- name: storage
    pdName: data-index-disk
    fsType: ext4


Now, whenever we deploy these configurations, we can reuse the previous index.

Exposing Kubernetes to Cluster

The HTTP and transport clients are enabled on port 9200 and 9300 respectively. They can be exposed to the rest of the cluster using the following service –

apiVersion: v1
kind: Service
  - name: http
    port: 9200
    protocol: TCP
  - name: transport
    port: 9300
    protocol: TCP


Once deployed, other deployments can access the cluster API from ports 9200 and 9300.

Connecting loklak to Kubernetes

To connect loklak to external Elasticsearch cluster, TransportClient Java API is used. In order to enable these settings, we simply need to make some changes in configurations.

Since we enable the service named “elasticsearch” in namespace “elasticsearch”, we can access the cluster at address elasticsearch.elasticsearch:9200 (web) and elasticsearch.elasticsearch:9300 (transport).

To confine these changes only to Kubernetes deployment, we can use sed command while building the image (in Dockerfile) –

sed -i.bak 's/^\(elasticsearch_transport.enabled\).*/\1=true/' conf/ && \
sed -i.bak 's/^\(elasticsearch_transport.addresses\).*/\1=elasticsearch.elasticsearch:9300/' conf/ && \


Now when we create the deployments in Kubernetes cluster, loklak auto connects to the external elasticsearch index and creates indices if needed.

Verifying persistence of the Elasticsearch Index

In order to see that the data persists, we can completely delete the deployment or even the cluster if we want. Later, when we recreate the deployment, we can see all the messages already present in the index.

I  [2017-07-29 09:42:51,804][INFO ][node                     ] [Hellion] initializing ...
I  [2017-07-29 09:42:52,024][INFO ][plugins                  ] [Hellion] loaded [cloud-kubernetes], sites []
I  [2017-07-29 09:42:52,055][INFO ][env                      ] [Hellion] using [1] data paths, mounts [[/data (/dev/sdb)]], net usable_space [84.9gb], net total_space [97.9gb], spins? [possibly], types [ext4]
I  [2017-07-29 09:42:53,543][INFO ][node                     ] [Hellion] initialized
I  [2017-07-29 09:42:53,543][INFO ][node                     ] [Hellion] starting ...
I  [2017-07-29 09:42:53,620][INFO ][transport                ] [Hellion] publish_address {}, bound_addresses {}
I  [2017-07-29 09:42:53,633][INFO ][discovery                ] [Hellion] loklakcluster/cJtXERHETKutq7nujluJvA
I  [2017-07-29 09:42:57,866][INFO ][cluster.service          ] [Hellion] new_master {Hellion}{cJtXERHETKutq7nujluJvA}{}{}{master=true}, reason: zen-disco-join(elected_as_master, [0] joins received)
I  [2017-07-29 09:42:57,955][INFO ][http                     ] [Hellion] publish_address {}, bound_addresses {}
I  [2017-07-29 09:42:57,955][INFO ][node                     ] [Hellion] started
I  [2017-07-29 09:42:58,082][INFO ][gateway                  ] [Hellion] recovered [8] indices into cluster_state

In the last line from the logs, we can see that indices already present on the disk were recovered. Now if we head to the public IP assigned to the cluster, we can see that the message count is restored.


In this blog post, I discussed how we utilised the Kubernetes setup to shift loklak to Google Cloud Platform. The deployment is active and can be accessed from the link provided under wiki section of loklak/loklak_server repo.

I introduced these changes in pull request loklak/loklak_server#1349 with the help of @niranjan94, @uday96 and @chiragw15.


Continue ReadingDeploying loklak Server on Kubernetes with External Elasticsearch

Caching Elasticsearch Aggregation Results in loklak Server

To provide aggregated data for various classifiers, loklak uses Elasticsearch aggregations. Aggregated data speaks a lot more than a few instances from it can say. But performing aggregations on each request can be very resource consuming. So we needed to come up with a way to reduce this load.

In this post, I will be discussing how I came up with a caching model for the aggregated data from the Elasticsearch index.

Fields to Consider while Caching

At the classifier endpoint, aggregations can be requested based on the following fields –

  • Classifier Name
  • Classifier Classes
  • Countries
  • Start Date
  • End Date

But to cache results, we can ignore cases where we just require a few classes or countries and store aggregations for all of them instead. So the fields that will define the cache to look for will be –

  • Classifier Name
  • Start Date
  • End Date

Type of Cache

The data structure used for caching was Java’s HashMap. It would be used to map a special string key to a special object discussed in a later section.


The key is built using the fields mentioned previously –

private static String getKey(String index, String classifier, String sinceDate, String untilDate) {
    return index + "::::"
        + classifier + "::::"
        + (sinceDate == null ? "" : sinceDate) + "::::"
        + (untilDate == null ? "" : untilDate);


In this way, we can handle requests where a user makes a request for every class there is without running the expensive aggregation job every time. This is because the key for such requests will be same as we are not considering country and class for this purpose.


The object used as key in the HashMap is a wrapper containing the following –

  1. json – It is a JSONObject containing the actual data.
  2. expiry – It is the expiry of the object in milliseconds.

class JSONObjectWrapper {
    private JSONObject json; 
    private long expiry;


The timeout associated with a cache is defined in the configuration file of the project as “classifierservlet.cache.timeout”. It defaults to 5 minutes and is used to set the eexpiryof a cached JSONObject –

class JSONObjectWrapper {
    private static long timeout = DAO.getConfig("classifierservlet.cache.timeout", 300000);

    JSONObjectWrapper(JSONObject json) {
        this.json = json;
        this.expiry = System.currentTimeMillis() + timeout;


Cache Hit

For searching in the cache, the previously mentioned string is composed from the parameters requested by the user. Checking for a cache hit can be done in the following manner –

String key = getKey(index, classifier, sinceDate, untilDate);
if (cacheMap.keySet().contains(key)) {
    JSONObjectWrapper jw = cacheMap.get(key);
    if (!jw.isExpired()) {
        // Do something with jw
// Calculate the aggregations

But since jw here would contain all the data, we would need to filter out the classes and countries which are not needed.

Filtering results

For filtering out the parts which do not contain the information requested by the user, we can perform a simple pass and exclude the results that are not needed.

Since the number of fields to filter out, i.e. classes and countries, would not be that high, this process would not be that resource intensive. And at the same time, would save us from requesting heavy aggregation tasks from the user.

Since the data about classes is nested inside the respective country field, we need to perform two level of filtering –

JSONObject retJson = new JSONObject(true);
for (String key : json.keySet()) {
    JSONArray value = filterInnerClasses(json.getJSONArray(key), classes);
    if ("GLOBAL".equals(key) || countries.contains(key)) {
        retJson.put(key, value);

Cache Miss

In the case of a cache miss, the helper functions are called from to get results. These results are then parsed from HashMap to JSONObject and stored in the cache for future usages.

JSONObject freshCache = getFromElasticsearch(index, classifier, sinceDate, untilDate);
cacheMap.put(key, new JSONObjectWrapper(freshCache));

The getFromElasticsearch method finds all the possible classes and makes a request to the appropriate method in ElasticsearchClient, getting data for all classifiers and all countries.


In this blog post, I discussed the need for caching of aggregations and the way it is achieved in the loklak server. This feature was introduced in pull request loklak/loklak_server#1333 by @singhpratyush (me).


Continue ReadingCaching Elasticsearch Aggregation Results in loklak Server

Data Indexing in Loklak Server

Loklak Server is a data-scraping system that indexes all the scraped data for the purpose to optimize it. The data fetched by different users is stored as cache. This helps in retrieving of data directly from cache for recurring queries. When users search for the same queries, load on Loklak Server is reduced by outputting indexed data, thus optimizing the operations.


It is dependent on ElasticSearch for indexing of cached data (as JSON). The data that is fetched by different users is stored as cache. This helps in fetching data directly from cache for same queries. When users search for the same queries, load on Loklak Server is reduced and it is optimized by outputting indexed data instead of scraping the same date again.

When is data indexing done?

The indexing of data is done when:

1) Data is scraped:

When data is scraped, data is indexed concurrently while cleaning of data in TwitterTweet data object. For this task, addScheduler static method of IncomingMessageBuffer is used, which acts as

abstract between scraping of data and storing and indexing of data.

The following is the implementation from TwitterScraper (from here). Here writeToIndex is the boolean input to whether index the data or not.

if (this.writeToIndex) IncomingMessageBuffer.addScheduler(this, this.user, true);

2) Data is fetched from backend:

When data is fetched from backend, it is indexed in Timeline iterator. It calls the above method to index data concurrently.

The following is the definition of writeToIndex() method from (from here). When writeToIndex() is called, the fetched data is indexed.

public void writeToIndex() {
    IncomingMessageBuffer.addScheduler(this, true);


When addScheduler static method of IncomingMessageBuffer is called, a thread is started that indexes all data. When the messagequeue data structure is filled with some messages, indexing continues.

See here . The DAO method writeMessageBulk is called here to write data. The data is then written to the following streams:

1) Dump: The data fetched is dumped into Import directory in a file. It can also be fetched from other peers.

2) Index: The data fetched is checked if it exists in the index and data that isn’t indexed is indexed.

public static Set<String> writeMessageBulk(Collection<MessageWrapper> mws) {
    List<MessageWrapper> noDump = new ArrayList<>();
    List<MessageWrapper> dump = new ArrayList<>();
    for (MessageWrapper mw: mws) {
        if (mw.t == null) continue;
        if (mw.dump) dump.add(mw);
        else noDump.add(mw);

    Set<String> createdIDs = new HashSet<>();

    // Does also do an writeMessageBulkNoDump internally
    return createdIDs;


The above code snippet is from, method calls writeMessageBulkNoDump(noDump) indexes the data to ElasticSearch. The definition of this method can be seen here

Whereas for dumping of data writeMessageBulkDump(Dump) is called. It is defined here


Continue ReadingData Indexing in Loklak Server

Some Other Services in Loklak Server

Loklak Server isn’t just a scraper system software, it provides numerous other services to perform other interesting functions like Link Unshortening (reverse of link shortening) and video fetching and administrative tasks like status fetching of the Loklak deployment (for analysis in Loklak development use) and many more. Some of these are internally implemented and rest can be used through http endpoints. Also there are some services which aren’t complete and are in development stage.

Let’s go through some of them to know a bit about them and how they can be used.

1) VideoUrlService

This is the service to extract video from the website that has a streaming video and output the video file link. This service is in development stage and is functional. Presently, It can fetch twitter video links and output them with different video qualities.

Endpoint: /api/videoUrlService.json

Implementation Example:

curl api/

2) Link Unshortening Service

This is the service used to unshorten the link. There are shortened URLs which are used to track the Internet Users by Websites. To prevent this, link unshortening service unshortens the link and returns the final untrackable link to the user.

Currently this service is in application in TwitterScraper to unshorten the fetched URLs. It has other methods to get Redirect Link and also a link to get final URL from multiple unshortened link.

Implementation Example from [LINK]:

Matcher m = timeline_link_pattern.matcher(text);

if (m.find()) {
    String expanded = RedirectUnshortener.unShorten(;
    text = m.replaceFirst(" " + expanded);


Further it can be used to as a service and can be used directly. New features like fetching featured image from links can be added to this service. Though these stuff are in discussion and enthusiastic contribution is most welcomed.

3) StatusService

This is a service that outputs all data related to to Loklak Server deployment’s configurations. To access this configuration, api endpoint status.json is used.

It outputs the following data:

a) About the number of messages it scrapes in an interval of a second, a minute, an hour, a day, etc.

b) The configuration of the server like RAM, assigned memory, used memory, number of cores of CPU, cpu load, etc.

c) And other configurations related to the application like size of ElasticSearch shards size and their specifications, client request header, number of running threads, etc.

Endpoint: /api/status.json

Implementation Example:

curl api/



Continue ReadingSome Other Services in Loklak Server

Using Elasticsearch Aggregations to Analyse Classifier Data in loklak Server

Loklak uses Elasticsearch to index Tweets and other social media entities. It also houses a classifier that classifies Tweets based on emotion, profanity and language. But earlier, this data was available only with the search API and there was no way to get aggregated data out of it. So as a part of my GSoC project, I proposed to introduce a new API endpoint which would allow users to access aggregated data from these classifiers.

In this blog post, I will be discussing how aggregations are performed on the Elasticsearch index of Tweets in the loklak server.

Structure of index

The ES index for Twitter is called messages and it has 3 fields related to classifiers –

  1. classifier_emotion
  2. classifier_language
  3. classifier_profanity

With each of these classifiers, we also have a probability attached which represents the confidence of the classifier for assigned class to a Tweet. The name of these fields is given by suffixing the emotion field by _probability (e.g. classifier_emotion_probability).

Since I will also be discussing aggregation based on countries in this blog post, there is also a field named place_country_code which saves the ISO 3166-1 alpha-2 code for the country of creation of Tweet.

Requesting aggregations using Elasticsearch Java API

Elasticsearch comes with a simple Java API which can be used to perform any desired task. To work with data, we need an ES client which can be built from a ES Node (if creating a cluster) or directly as a transport client (if connecting remotely to a cluster) –

// Transport client
TransportClient tc = TransportClient.builder()

// From a node
Node elasticsearchNode = NodeBuilder.nodeBuilder()
Client nc = elasticsearchNode.client();


Once we have a client, we can use ES AggregationBuilder to get aggregations from an index –

SearchResponse response = elasticsearchClient.prepareSearch(indexName)
                            .setQuery(QueryBuilders.matchAllQuery())  // Consider every row
                            .setFrom(0).setSize(0)  // 0 offset, 0 result size (do not return any rows)
                            .addAggregation(aggr)  // aggr is a AggregatoinBuilder object
                            .execute().actionGet();  // Execute and get results


AggregationBuilders are objects that define the properties of an aggregation task using ES’s Java API. This code snippet is applicable for any type of aggregation that we wish to perform on an index, given that we do not want to fetch any rows as a response.

Performing simple aggregation for a classifier

In this section, I will discuss the process to get results from a given classifier in loklak’s ES index. Here, we will be targeting a class-wise count of rows and stats (average and sum) of probabilities.

Writing AggregationBuilder

An AggregationBuilder for this task will be a Terms AggregationBuilder which would dynamically generate buckets for all the different values of fields for a given field in index –

AggregationBuilder getClassifierAggregationBuilder(String classifierName) {
    String probabilityField = classifierName + "_probability";
    return AggregationBuilders.terms("by_class").field(classifierName)


Here, the name of aggregation is passed as by_class. This will be used while processing the results for this aggregation task. Also, sub-aggregation is used to get average and sum probability by the name of avg_probability and sum_probability respectively. There is no need to specify to count rows as this is done by default.

Processing results

Once we have executed the aggregation task and received the SearchResponse as sr (say), we can use the name of top level aggregation to get Terms aggregation object –

Terms aggrs = sr.getAggregations().get("by_class");

After that, we can iterate through the buckets to get results –

for (Terms.Bucket bucket : aggrs.getBuckets()) {
String key = bucket.getKeyAsString();
long docCount = bucket.getDocCount(); // Number of rows
// Use name of sub aggregations to get results
Sum sum = bucket.getAggregations().get("sum_probability");
Avg avg = bucket.getAggregations().get("avg_probability");
// Do something with key, docCount, sum and avg


So in this manner, results from aggregation response can be processed.

Performing nested aggregations for different countries

The previous section described the process to perform aggregation over a single field. For this section, we’ll aim to get results for each country present in the index given a classifier field.

Writing a nested aggregation builder

To get the aggregation required, AggregationBuilder from previous section can be added as a sub-aggregation for the AggregationBuilder for country code field –

AggregationBuilder aggrs = AggregationBuilders.terms("by_country").field("place_country_code")


Processing the results

Again, we can get the results by processing the AggregationBuilders by name in a top-to-bottom fashion –

Terms aggrs = response.getAggregations().get("by_country");
for (Terms.Bucket bucket : aggr.getBuckets()) {
    String countryCode = bucket.getKeyAsString();
    Terms classAggrs = bucket.getAggregations().get("by_class");
    for (Terms.Bucket classBucket : classAggr.getBuckets()) {
        String key = classBucket.getKeyAsString();
        long docCount = classBucket.getDocCount();
        Sum sum = classBucket.getAggregations().get("sum_probability");
        Avg avg = classBucket.getAggregations().get("avg_probability");


And we have the data about classifier for each country present in the index.


This blog post explained about Elasticsearch aggregations and their usage in the loklak server project. The changes discussed here were introduced over a series of patches to by @singhpratyush (me).


Continue ReadingUsing Elasticsearch Aggregations to Analyse Classifier Data in loklak Server