Indexing for multiscrapers in Loklak Server

I recently added multiscraper system which can scrape data from web-scrapers like YoutubeScraper, QuoraScraper, GithubScraper, etc. As scraping is a costly task, it is important to improve it’s efficiency. One of the approach is to index data in cache. TwitterScraper uses multiple sources to optimize the efficiency.

This system uses Post message holder object to store data and PostTimeline (a specialized iterator) to iterate the data objects. This difference in data structures from TwitterScraper leads to the need of different approach to implement indexing of data to ElasticSearch (currently in review process).

These are the following changes I made while implementing ‘indexing of data’ in the project.

1) Writing of data is invoked only using PostTimeline iterator

In TwitterScraper, the data is written in message holder TwitterTweet. So all the tweets are written to index as they are created. Here, when the data is scraped, Writing of the posts is initiated. Scraping of data is considered a heavy process. This approach keeps lower resource usage in average traffic on the server.

protected Post putData(Post typeArray, String key, Timeline2 postList) {
   if(!"cache".equals(this.source)) {
       postList.writeToIndex();
   }
   return this.putData(typeArray, key, postList.toArray());
}

2) One object for holding a message

During the implementation, I kept the same message holder Post and post-iterator PostTimeline from scraping to indexing of data. This helps to keep the structure uniform. Earlier approach involves different types of message wrappers in the way. This approach cuts the processes for looping and transitioning of data structures.

3) Index a list, not a message

In TwitterScraper, as the messages are enqueued in the bulk to be indexed. But in this approach, I have enqueued the complete lists. This approach delays the indexing till the scraper is done with processing the html.

Creating the queue of postlists:

// Add post-lists to queue to be indexed
queueClients.incrementAndGet();
try {
    postQueue.put(postList);
} catch (InterruptedException e) {
DAO.severe(e);
}
queueClients.decrementAndGet();

 

Indexing of the posts in postlists:

// Start indexing of data in post-lists
for (Timeline2 postList: postBulk) {
    if (postList.size() < 1) continue;
    if(postList.dump) {
        // Dumping of data in a file
        writeMessageBulkDump(postList);
    }
    // Indexing of data to ElasticSearch
    writeMessageBulkNoDump(postList);
}

 

4) Categorizing the input parameters

While searching the index, I have divided the query parameters from scraper into 3 categories. The input parameters are added to those categories (implemented using map data structure) and thus data fetched are according to them. These categories are:

// Declaring the QueryBuilder
BoolQueryBuilder query = new BoolQueryBuilder();

 

a) Get the parameter– Get the results for the input fields in map getMap.

// Result must have these fields. Acts as AND operator
if(getMap != null) {
    for(Map.Entry<String, String> field : getMap.entrySet()) {
        query.must(QueryBuilders.termQuery(
field.getKey(), field.getValue()));
    }
}

 

b) Don’t get the parameter- Don’t get the results for the input fields in map notGetMap.

// Result must not have these fields.
if(notGetMap != null) {
    for(Map.Entry<String, String> field : notGetMap.entrySet()) {
        query.mustNot(QueryBuilders.termQuery(
                field.getKey(), field.getValue()));
    }
}

 

c) Get if possible- Get the results with the input fields if they are present in the index.

// Result may preferably also get these fields. Acts as OR operator
if(mayAlsoGetMap != null) {
    for(Map.Entry<String, String> field : mayAlsoGetMap.entrySet()) {
        query.should(QueryBuilders.termQuery(
                field.getKey(), field.getValue()));

    }
}

 

By applying these changes, the scrapers are shifted from a message indexing to list of messages indexing. This way we are keeping load on RAM low, but the aggregation of latest scraped data may be affected. So there will be a need to workaround to solve this issue while scraping itself.

References

Creating A Dockerfile For Yacy Grid MCP

The YaCy Grid is the second-generation implementation of YaCy, a peer-to-peer search engine. A YaCy Grid installation consists of a set of micro-services which communicate with each other using a common infrastructure for data persistence. The task was to deploy the second-generation of YaCy Grid. To do so, we first had created a Dockerfile. This dockerfile should start the micro services such as rabbitmq, Apache ftp and elasticsearch in one docker instance along with MCP. The microservices perform following tasks:

  • Apache ftp server for asset storage.
  • RabbitMQ message queues for the message system.
  • Elasticsearch for database operations.

To launch these microservices using Dockerfile, we referred to following documentations regarding running these services locally: https://github.com/yacy/yacy_grid_mcp/blob/master/README.md

For creating a Dockerfile we proceeded as follows:

FROM ubuntu:latest
MAINTAINER Harshit Prasad# Update
RUN apt-get update
RUN apt-get upgrade -y# add packages
# install jdk package for java
RUN apt-get install -y git openjdk-8-jdk

#install gradle required for build
RUN apt-get update && apt-get install -y software-properties-common
RUN add-apt-repository ppa:cwchien/gradle
RUN apt-get update
RUN apt-get install -y wget
RUN wget https://services.gradle.org/distributions/gradle-3.4.1-bin.zip
RUN mkdir /opt/gradle
RUN apt-get install -y unzip
RUN unzip -d /opt/gradle gradle-3.4.1-bin.zip
RUN PATH=$PATH:/opt/gradle/gradle-3.4.1/bin
ENV GRADLE_HOME=/opt/gradle/gradle-3.4.1
ENV PATH=$PATH:$GRADLE_HOME/bin
RUN gradle -v

# install apache ftp server 1.1.0
RUN wget http://www-eu.apache.org/dist/mina/ftpserver/1.1.0/dist/apache-ftpserver-1.1.0.tar.gz
RUN tar xfz apache-ftpserver-1.1.0.tar.gz

# install RabbitMQ server
RUN wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-generic-unix-3.6.6.tar.xz
RUN tar xf rabbitmq-server-generic-unix-3.6.6.tar.xz

# install erlang language for RabbitMQ
RUN apt-get install -y erlang

# install elasticsearch
RUN wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.5.0.tar.gz
RUN sha1sum elasticsearch-5.5.0.tar.gz
RUN tar -xzf elasticsearch-5.5.0.tar.gz

# clone yacy_grid_mcp repository
RUN git clone https://github.com/nikhilrayaprolu/yacy_grid_mcp.git
WORKDIR /yacy_grid_mcp

RUN cat docker/configftp.properties > ../apacheftpserver1.1.0/res/conf/users.properties

# compile
RUN gradle build
RUN mkdir data/mcp-8100/conf/ -p
RUN cp docker/config-mcp.properties data/mcp-8100/conf/config.properties
RUN chmod +x ./docker/start.sh

# Expose web interface ports
# 2121: ftp, a FTP server to be used for mass data / file storage
# 5672: rabbitmq, a rabbitmq message queue server to be used for global messages, queues and stacks
# 9300: elastic, an elasticsearch server or main cluster address for global database storage
EXPOSE 2121 5672 9300 9200 15672 8100

# Define default command.
ENTRYPOINT [“/bin/bash”, “./docker/start.sh”]

 

We have created a start.sh file to start RabbitMQ and Apache FTP services. At the end, for compilation gradle run will be executed.

adduser –disabled-password –gecos ” r
adduser r sudo
echo ‘%sudo ALL=(ALL) NOPASSWD:ALL’ >> /etc/sudoers
chmod a+rwx /elasticsearch-5.5.0 -R
su -m r -c ‘/elasticsearch-5.5.0/bin/elasticsearch -Ecluster.name=yacygrid &’
cd /apacheftpserver1.1.0
./bin/ftpd.sh res/conf/ftpdtypical.xml &
/rabbitmq_server-3.6.6/sbin/rabbitmq-server -detached
sleep 5s;
/rabbitmq_server-3.6.6/sbin/rabbitmq-plugins enable rabbitmq_management
/rabbitmq_server3.6.6/sbin/rabbitmqctl add_user yacygrid password4account
echo [{rabbit, [{loopback_users, []}]}]. >> /rabbitmq_server-3.6.6/etc/rabbitmq/rabbitmq.config
/rabbitmq_server-3.6.6/sbin/rabbitmqctl set_permissions -p / yacygrid “.*” “.*” “.*”
cd /yacy_grid_mcp
sleep 5s;
gradle run

 

start.sh will first add username and then password. Then it will start RabbitMQ along with Apache FTP.  For username and password, we have created a separate files to configure their properties during Docker run which can be found here:

The logic behind running all the microservices in one docker instance was: creating each container for microservice and then link those containers with the help of docker-compose.yml file.

The Dockerfile which we have created was corresponding to one image. Another image was elasticsearch which was linked to this Dockerfile. The latest version of elasticsearch image was already available on their site: https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html

We configured the docker-compose.yml file according to the reference link provided above. The docker-compose file can be found here: https://github.com/yacy/yacy_grid_mcp/blob/master/docker/docker-compose.yml

The source code for the implementation of whole structure can be found here: https://github.com/yacy/yacy_grid_mcp/tree/master/docker

Resources

 

Deploying loklak Server on Kubernetes with External Elasticsearch

Kubernetes is an open-source system for automating deployment, scaling, and management of containerized applications.

kubernetes.io

Kubernetes is an awesome cloud platform, which ensures that cloud applications run reliably. It runs automated tests, flawless updates, smart roll out and rollbacks, simple scaling and a lot more.

So as a part of GSoC, I worked on taking the loklak server to Kubernetes on Google Cloud Platform. In this blog post, I will be discussing the approach followed to deploy development branch of loklak on Kubernetes.

New Docker Image

Since Kubernetes deployments work on Docker images, we needed one for the loklak project. The existing image would not be up to the mark for Kubernetes as it contained the declaration of volumes and exposing of ports. So I wrote a new Docker image which could be used in Kubernetes.

The image would simply clone loklak server, build the project and trigger the server as CMD

FROM alpine:latest

ENV LANG=en_US.UTF-8
ENV JAVA_TOOL_OPTIONS=-Dfile.encoding=UTF8

WORKDIR /loklak_server

RUN apk update && apk add openjdk8 git bash && \
    git clone https://github.com/loklak/loklak_server.git /loklak_server && \
    git checkout development && \
    ./gradlew build -x test -x checkstyleTest -x checkstyleMain -x jacocoTestReport && \
    # Some Configurations and Cleanups

CMD ["bin/start.sh", "-Idn"]

[SOURCE]

This image wouldn’t have any volumes or exposed ports and we are now free to configure them in the configuration files (discussed in a later section).

Building and Pushing Docker Image using Travis

To automatically build and push on a commit to the master branch, Travis build is used. In the after_success section, a call to push Docker image is made.

Travis environment variables hold the username and password for Docker hub and are used for logging in –

docker login -u $DOCKER_USERNAME -p $DOCKER_PASSWORD

[SOURCE]

We needed checks there to ensure that we are on the right branch for the push and we are not handling a pull request –

# Build and push Kubernetes Docker image
KUBERNETES_BRANCH=loklak/loklak_server:latest-kubernetes-$TRAVIS_BRANCH
KUBERNETES_COMMIT=loklak/loklak_server:kubernetes-$TRAVIS_COMMIT
  
if [ "$TRAVIS_BRANCH" == "development" ]; then
    docker build -t loklak_server_kubernetes kubernetes/images/development
    docker tag loklak_server_kubernetes $KUBERNETES_BRANCH
    docker push $KUBERNETES_BRANCH
    docker tag $KUBERNETES_BRANCH $KUBERNETES_COMMIT
    docker push $KUBERNETES_COMMIT
elif [ "$TRAVIS_BRANCH" == "master" ]; then
    # Build and push master
else
    echo "Skipping Kubernetes image push for branch $TRAVIS_BRANCH"
fi

[SOURCE]

Kubernetes Configurations for loklak

Kubernetes cluster can completely be configured using configurations written in YAML format. The deployment of loklak uses the previously built image. Initially, the image tagged as latest-kubernetes-development is used –

apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: server
  namespace: web
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: server
    spec:
      containers:
      - name: server
        image: loklak/loklak_server:latest-kubernetes-development
        ...

[SOURCE]

Readiness and Liveness Probes

Probes act as the top level tester for the health of a deployment in Kubernetes. The probes are performed periodically to ensure that things are working fine and appropriate steps are taken if they fail.

When a new image is updated, the older pod still runs and servers the requests. It is replaced by the new ones only when the probes are successful, otherwise, the update is rolled back.

In loklak, the /api/status.json endpoint gives information about status of deployment and hence is a good target for probes –

livenessProbe:
  httpGet:
    path: /api/status.json
    port: 80
  initialDelaySeconds: 30
  timeoutSeconds: 3
readinessProbe:
  httpGet:
    path: /api/status.json
    port: 80
  initialDelaySeconds: 30
  timeoutSeconds: 3

[SOURCE]

These probes are performed periodically and the server is restarted if they fail (non-success HTTP status code or takes more than 3 seconds).

Ports and Volumes

In the configurations, port 80 is exposed as this is where Jetty serves inside loklak –

ports:
- containerPort: 80
  protocol: TCP

[SOURCE]

If we notice, this is the port that we used for running the probes. Since the development branch deployment holds no dumps, we didn’t need to specify any explicit volumes for persistence.

Load Balancer Service

While creating the configurations, a new public IP is assigned to the deployment using Google Cloud Platform’s load balancer. It starts listening on port 80 –

ports:
- containerPort: 80
  protocol: TCP

[SOURCE]

Since this service creates a new public IP, it is recommended not to replace/recreate this services as this would result in the creation of new public IP. Other components can be updated individually.

Kubernetes Configurations for Elasticsearch

To maintain a persistent index, this deployment would require an external Elasticsearch cluster. loklak is able to connect itself to external Elasticsearch cluster by changing a few configurations.

Docker Image and Environment Variables

The image used for Elasticsearch is taken from pires/docker-elasticsearch-kubernetes. It allows easy configuration of properties from environment variables in configurations. Here is a list of configurable variables, but we needed just a few of them to do our task –

image: quay.io/pires/docker-elasticsearch-kubernetes:2.0.0
env:
- name: KUBERNETES_CA_CERTIFICATE_FILE
  value: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
- name: NAMESPACE
  valueFrom:
    fieldRef:
      fieldPath: metadata.namespace
- name: "CLUSTER_NAME"
  value: "loklakcluster"
- name: "DISCOVERY_SERVICE"
  value: "elasticsearch"
- name: NODE_MASTER
  value: "true"
- name: NODE_DATA
  value: "true"
- name: HTTP_ENABLE
  value: "true"

[SOURCE]

Persistent Index using Persistent Cloud Disk

To make the index last even after the deployment is stopped, we needed a stable place where we could store all that data. Here, Google Compute Engine’s standard persistent disk was used. The disk can be created using GCP web portal or the gcloud CLI.

Before attaching the disk, we need to declare a volume where we could mount it –

volumeMounts:
- mountPath: /data
  name: storage

[SOURCE]

Now that we have a volume, we can simply mount the persistent disk on it –

volumes:
- name: storage
  gcePersistentDisk:
    pdName: data-index-disk
    fsType: ext4

[SOURCE]

Now, whenever we deploy these configurations, we can reuse the previous index.

Exposing Kubernetes to Cluster

The HTTP and transport clients are enabled on port 9200 and 9300 respectively. They can be exposed to the rest of the cluster using the following service –

apiVersion: v1
kind: Service
...
Spec:
  ...
  ports:
  - name: http
    port: 9200
    protocol: TCP
  - name: transport
    port: 9300
    protocol: TCP

[SOURCE]

Once deployed, other deployments can access the cluster API from ports 9200 and 9300.

Connecting loklak to Kubernetes

To connect loklak to external Elasticsearch cluster, TransportClient Java API is used. In order to enable these settings, we simply need to make some changes in configurations.

Since we enable the service named “elasticsearch” in namespace “elasticsearch”, we can access the cluster at address elasticsearch.elasticsearch:9200 (web) and elasticsearch.elasticsearch:9300 (transport).

To confine these changes only to Kubernetes deployment, we can use sed command while building the image (in Dockerfile) –

sed -i.bak 's/^\(elasticsearch_transport.enabled\).*/\1=true/' conf/config.properties && \
sed -i.bak 's/^\(elasticsearch_transport.addresses\).*/\1=elasticsearch.elasticsearch:9300/' conf/config.properties && \

[SOURCE]

Now when we create the deployments in Kubernetes cluster, loklak auto connects to the external elasticsearch index and creates indices if needed.

Verifying persistence of the Elasticsearch Index

In order to see that the data persists, we can completely delete the deployment or even the cluster if we want. Later, when we recreate the deployment, we can see all the messages already present in the index.

I  [2017-07-29 09:42:51,804][INFO ][node                     ] [Hellion] initializing ...
 
I  [2017-07-29 09:42:52,024][INFO ][plugins                  ] [Hellion] loaded [cloud-kubernetes], sites []
 
I  [2017-07-29 09:42:52,055][INFO ][env                      ] [Hellion] using [1] data paths, mounts [[/data (/dev/sdb)]], net usable_space [84.9gb], net total_space [97.9gb], spins? [possibly], types [ext4]
 
I  [2017-07-29 09:42:53,543][INFO ][node                     ] [Hellion] initialized
 
I  [2017-07-29 09:42:53,543][INFO ][node                     ] [Hellion] starting ...
 
I  [2017-07-29 09:42:53,620][INFO ][transport                ] [Hellion] publish_address {10.8.1.13:9300}, bound_addresses {10.8.1.13:9300}
 
I  [2017-07-29 09:42:53,633][INFO ][discovery                ] [Hellion] loklakcluster/cJtXERHETKutq7nujluJvA
 
I  [2017-07-29 09:42:57,866][INFO ][cluster.service          ] [Hellion] new_master {Hellion}{cJtXERHETKutq7nujluJvA}{10.8.1.13}{10.8.1.13:9300}{master=true}, reason: zen-disco-join(elected_as_master, [0] joins received)
 
I  [2017-07-29 09:42:57,955][INFO ][http                     ] [Hellion] publish_address {10.8.1.13:9200}, bound_addresses {10.8.1.13:9200}
 
I  [2017-07-29 09:42:57,955][INFO ][node                     ] [Hellion] started
 
I  [2017-07-29 09:42:58,082][INFO ][gateway                  ] [Hellion] recovered [8] indices into cluster_state

In the last line from the logs, we can see that indices already present on the disk were recovered. Now if we head to the public IP assigned to the cluster, we can see that the message count is restored.

Conclusion

In this blog post, I discussed how we utilised the Kubernetes setup to shift loklak to Google Cloud Platform. The deployment is active and can be accessed from the link provided under wiki section of loklak/loklak_server repo.

I introduced these changes in pull request loklak/loklak_server#1349 with the help of @niranjan94, @uday96 and @chiragw15.

Resources

Caching Elasticsearch Aggregation Results in loklak Server

To provide aggregated data for various classifiers, loklak uses Elasticsearch aggregations. Aggregated data speaks a lot more than a few instances from it can say. But performing aggregations on each request can be very resource consuming. So we needed to come up with a way to reduce this load.

In this post, I will be discussing how I came up with a caching model for the aggregated data from the Elasticsearch index.

Fields to Consider while Caching

At the classifier endpoint, aggregations can be requested based on the following fields –

  • Classifier Name
  • Classifier Classes
  • Countries
  • Start Date
  • End Date

But to cache results, we can ignore cases where we just require a few classes or countries and store aggregations for all of them instead. So the fields that will define the cache to look for will be –

  • Classifier Name
  • Start Date
  • End Date

Type of Cache

The data structure used for caching was Java’s HashMap. It would be used to map a special string key to a special object discussed in a later section.

Key

The key is built using the fields mentioned previously –

private static String getKey(String index, String classifier, String sinceDate, String untilDate) {
    return index + "::::"
        + classifier + "::::"
        + (sinceDate == null ? "" : sinceDate) + "::::"
        + (untilDate == null ? "" : untilDate);
}

[SOURCE]

In this way, we can handle requests where a user makes a request for every class there is without running the expensive aggregation job every time. This is because the key for such requests will be same as we are not considering country and class for this purpose.

Value

The object used as key in the HashMap is a wrapper containing the following –

  1. json – It is a JSONObject containing the actual data.
  2. expiry – It is the expiry of the object in milliseconds.

class JSONObjectWrapper {
    private JSONObject json; 
    private long expiry;
    ... 
}

Timeout

The timeout associated with a cache is defined in the configuration file of the project as “classifierservlet.cache.timeout”. It defaults to 5 minutes and is used to set the eexpiryof a cached JSONObject –

class JSONObjectWrapper {
    ...
    private static long timeout = DAO.getConfig("classifierservlet.cache.timeout", 300000);

    JSONObjectWrapper(JSONObject json) {
        this.json = json;
        this.expiry = System.currentTimeMillis() + timeout;
    }
    ...
}

 

Cache Hit

For searching in the cache, the previously mentioned string is composed from the parameters requested by the user. Checking for a cache hit can be done in the following manner –

String key = getKey(index, classifier, sinceDate, untilDate);
if (cacheMap.keySet().contains(key)) {
    JSONObjectWrapper jw = cacheMap.get(key);
    if (!jw.isExpired()) {
        // Do something with jw
    }
}
// Calculate the aggregations
...

But since jw here would contain all the data, we would need to filter out the classes and countries which are not needed.

Filtering results

For filtering out the parts which do not contain the information requested by the user, we can perform a simple pass and exclude the results that are not needed.

Since the number of fields to filter out, i.e. classes and countries, would not be that high, this process would not be that resource intensive. And at the same time, would save us from requesting heavy aggregation tasks from the user.

Since the data about classes is nested inside the respective country field, we need to perform two level of filtering –

JSONObject retJson = new JSONObject(true);
for (String key : json.keySet()) {
    JSONArray value = filterInnerClasses(json.getJSONArray(key), classes);
    if ("GLOBAL".equals(key) || countries.contains(key)) {
        retJson.put(key, value);
    }
}

Cache Miss

In the case of a cache miss, the helper functions are called from ElasticsearchClient.java to get results. These results are then parsed from HashMap to JSONObject and stored in the cache for future usages.

JSONObject freshCache = getFromElasticsearch(index, classifier, sinceDate, untilDate);
cacheMap.put(key, new JSONObjectWrapper(freshCache));

The getFromElasticsearch method finds all the possible classes and makes a request to the appropriate method in ElasticsearchClient, getting data for all classifiers and all countries.

Conclusion

In this blog post, I discussed the need for caching of aggregations and the way it is achieved in the loklak server. This feature was introduced in pull request loklak/loklak_server#1333 by @singhpratyush (me).

Resources

Data Indexing in Loklak Server

Loklak Server is a data-scraping system that indexes all the scraped data for the purpose to optimize it. The data fetched by different users is stored as cache. This helps in retrieving of data directly from cache for recurring queries. When users search for the same queries, load on Loklak Server is reduced by outputting indexed data, thus optimizing the operations.

Application

It is dependent on ElasticSearch for indexing of cached data (as JSON). The data that is fetched by different users is stored as cache. This helps in fetching data directly from cache for same queries. When users search for the same queries, load on Loklak Server is reduced and it is optimized by outputting indexed data instead of scraping the same date again.

When is data indexing done?

The indexing of data is done when:

1) Data is scraped:

When data is scraped, data is indexed concurrently while cleaning of data in TwitterTweet data object. For this task, addScheduler static method of IncomingMessageBuffer is used, which acts as

abstract between scraping of data and storing and indexing of data.

The following is the implementation from TwitterScraper (from here). Here writeToIndex is the boolean input to whether index the data or not.

if (this.writeToIndex) IncomingMessageBuffer.addScheduler(this, this.user, true);

2) Data is fetched from backend:

When data is fetched from backend, it is indexed in Timeline iterator. It calls the above method to index data concurrently.

The following is the definition of writeToIndex() method from Timeline.java (from here). When writeToIndex() is called, the fetched data is indexed.

public void writeToIndex() {
    IncomingMessageBuffer.addScheduler(this, true);
}

How?

When addScheduler static method of IncomingMessageBuffer is called, a thread is started that indexes all data. When the messagequeue data structure is filled with some messages, indexing continues.

See here . The DAO method writeMessageBulk is called here to write data. The data is then written to the following streams:

1) Dump: The data fetched is dumped into Import directory in a file. It can also be fetched from other peers.

2) Index: The data fetched is checked if it exists in the index and data that isn’t indexed is indexed.

public static Set<String> writeMessageBulk(Collection<MessageWrapper> mws) {
    List<MessageWrapper> noDump = new ArrayList<>();
    List<MessageWrapper> dump = new ArrayList<>();
    for (MessageWrapper mw: mws) {
        if (mw.t == null) continue;
        if (mw.dump) dump.add(mw);
        else noDump.add(mw);
    }

    Set<String> createdIDs = new HashSet<>();
    createdIDs.addAll(writeMessageBulkNoDump(noDump));
    createdIDs.addAll(writeMessageBulkDump(dump));

    // Does also do an writeMessageBulkNoDump internally
    return createdIDs;
}

 

The above code snippet is from DAO.java, method calls writeMessageBulkNoDump(noDump) indexes the data to ElasticSearch. The definition of this method can be seen here

Whereas for dumping of data writeMessageBulkDump(Dump) is called. It is defined here

Resources:

Some Other Services in Loklak Server

Loklak Server isn’t just a scraper system software, it provides numerous other services to perform other interesting functions like Link Unshortening (reverse of link shortening) and video fetching and administrative tasks like status fetching of the Loklak deployment (for analysis in Loklak development use) and many more. Some of these are internally implemented and rest can be used through http endpoints. Also there are some services which aren’t complete and are in development stage.

Let’s go through some of them to know a bit about them and how they can be used.

1) VideoUrlService

This is the service to extract video from the website that has a streaming video and output the video file link. This service is in development stage and is functional. Presently, It can fetch twitter video links and output them with different video qualities.

Endpoint: /api/videoUrlService.json

Implementation Example:

curl api/loklak.org/api/videoUrlService.json?id=https://twitter.com/EXOGlobal/status/886182766970257409&id=https://twitter.com/KMbappe/status/885963850708865025

2) Link Unshortening Service

This is the service used to unshorten the link. There are shortened URLs which are used to track the Internet Users by Websites. To prevent this, link unshortening service unshortens the link and returns the final untrackable link to the user.

Currently this service is in application in TwitterScraper to unshorten the fetched URLs. It has other methods to get Redirect Link and also a link to get final URL from multiple unshortened link.

Implementation Example from TwitterScraper.java [LINK]:

Matcher m = timeline_link_pattern.matcher(text);

if (m.find()) {
    String expanded = RedirectUnshortener.unShorten(m.group(2));
    text = m.replaceFirst(" " + expanded);
    continue;
}

 

Further it can be used to as a service and can be used directly. New features like fetching featured image from links can be added to this service. Though these stuff are in discussion and enthusiastic contribution is most welcomed.

3) StatusService

This is a service that outputs all data related to to Loklak Server deployment’s configurations. To access this configuration, api endpoint status.json is used.

It outputs the following data:

a) About the number of messages it scrapes in an interval of a second, a minute, an hour, a day, etc.

b) The configuration of the server like RAM, assigned memory, used memory, number of cores of CPU, cpu load, etc.

c) And other configurations related to the application like size of ElasticSearch shards size and their specifications, client request header, number of running threads, etc.

Endpoint: /api/status.json

Implementation Example:

curl api/loklak.org/api/status.json

Resources:

 

Using Elasticsearch Aggregations to Analyse Classifier Data in loklak Server

Loklak uses Elasticsearch to index Tweets and other social media entities. It also houses a classifier that classifies Tweets based on emotion, profanity and language. But earlier, this data was available only with the search API and there was no way to get aggregated data out of it. So as a part of my GSoC project, I proposed to introduce a new API endpoint which would allow users to access aggregated data from these classifiers.

In this blog post, I will be discussing how aggregations are performed on the Elasticsearch index of Tweets in the loklak server.

Structure of index

The ES index for Twitter is called messages and it has 3 fields related to classifiers –

  1. classifier_emotion
  2. classifier_language
  3. classifier_profanity

With each of these classifiers, we also have a probability attached which represents the confidence of the classifier for assigned class to a Tweet. The name of these fields is given by suffixing the emotion field by _probability (e.g. classifier_emotion_probability).

Since I will also be discussing aggregation based on countries in this blog post, there is also a field named place_country_code which saves the ISO 3166-1 alpha-2 code for the country of creation of Tweet.

Requesting aggregations using Elasticsearch Java API

Elasticsearch comes with a simple Java API which can be used to perform any desired task. To work with data, we need an ES client which can be built from a ES Node (if creating a cluster) or directly as a transport client (if connecting remotely to a cluster) –

// Transport client
TransportClient tc = TransportClient.builder()
                        .settings(mySettings)
                        .build();

// From a node
Node elasticsearchNode = NodeBuilder.nodeBuilder()
                            .local(false).settings(mySettings)
                            .node();
Client nc = elasticsearchNode.client();

[SOURCE]

Once we have a client, we can use ES AggregationBuilder to get aggregations from an index –

SearchResponse response = elasticsearchClient.prepareSearch(indexName)
                            .setSearchType(SearchType.QUERY_THEN_FETCH)
                            .setQuery(QueryBuilders.matchAllQuery())  // Consider every row
                            .setFrom(0).setSize(0)  // 0 offset, 0 result size (do not return any rows)
                            .addAggregation(aggr)  // aggr is a AggregatoinBuilder object
                            .execute().actionGet();  // Execute and get results

[SOURCE]

AggregationBuilders are objects that define the properties of an aggregation task using ES’s Java API. This code snippet is applicable for any type of aggregation that we wish to perform on an index, given that we do not want to fetch any rows as a response.

Performing simple aggregation for a classifier

In this section, I will discuss the process to get results from a given classifier in loklak’s ES index. Here, we will be targeting a class-wise count of rows and stats (average and sum) of probabilities.

Writing AggregationBuilder

An AggregationBuilder for this task will be a Terms AggregationBuilder which would dynamically generate buckets for all the different values of fields for a given field in index –

AggregationBuilder getClassifierAggregationBuilder(String classifierName) {
    String probabilityField = classifierName + "_probability";
    return AggregationBuilders.terms("by_class").field(classifierName)
        .subAggregation(
            AggregationBuilders.avg("avg_probability").field(probabilityField)
        )
        .subAggregation(
            AggregationBuilders.sum("sum_probability").field(probabilityField)
        );
}

[SOURCE]

Here, the name of aggregation is passed as by_class. This will be used while processing the results for this aggregation task. Also, sub-aggregation is used to get average and sum probability by the name of avg_probability and sum_probability respectively. There is no need to specify to count rows as this is done by default.

Processing results

Once we have executed the aggregation task and received the SearchResponse as sr (say), we can use the name of top level aggregation to get Terms aggregation object –

Terms aggrs = sr.getAggregations().get("by_class");

After that, we can iterate through the buckets to get results –

for (Terms.Bucket bucket : aggrs.getBuckets()) {
String key = bucket.getKeyAsString();
long docCount = bucket.getDocCount(); // Number of rows
// Use name of sub aggregations to get results
Sum sum = bucket.getAggregations().get("sum_probability");
Avg avg = bucket.getAggregations().get("avg_probability");
// Do something with key, docCount, sum and avg
}

[SOURCE]

So in this manner, results from aggregation response can be processed.

Performing nested aggregations for different countries

The previous section described the process to perform aggregation over a single field. For this section, we’ll aim to get results for each country present in the index given a classifier field.

Writing a nested aggregation builder

To get the aggregation required, AggregationBuilder from previous section can be added as a sub-aggregation for the AggregationBuilder for country code field –

AggregationBuilder aggrs = AggregationBuilders.terms("by_country").field("place_country_code")
    .subAggregation(
        AggregationBuilders.terms("by_class").field(classifierName)
            .subAggregation(
                AggregationBuilders.avg("avg_probability").field(probabilityField)
            )
            .subAggregation(
                AggregationBuilders.sum("sum_probability").field(probabilityField)
            );
    );

[SOURCE]

Processing the results

Again, we can get the results by processing the AggregationBuilders by name in a top-to-bottom fashion –

Terms aggrs = response.getAggregations().get("by_country");
for (Terms.Bucket bucket : aggr.getBuckets()) {
    String countryCode = bucket.getKeyAsString();
    Terms classAggrs = bucket.getAggregations().get("by_class");
    for (Terms.Bucket classBucket : classAggr.getBuckets()) {
        String key = classBucket.getKeyAsString();
        long docCount = classBucket.getDocCount();
        Sum sum = classBucket.getAggregations().get("sum_probability");
        Avg avg = classBucket.getAggregations().get("avg_probability");
        ...
    }
    ...
}

[SOURCE]

And we have the data about classifier for each country present in the index.

Conclusion

This blog post explained about Elasticsearch aggregations and their usage in the loklak server project. The changes discussed here were introduced over a series of patches to ElasticsearchClient.java by @singhpratyush (me).

Resources

Simplifying Scrapers using BaseScraper

Loklak Server‘s main function is to scrape data from websites and other sources and output in different formats like JSON, xml and rss. There are many scrapers in the project that scrape data and output them, but are implemented with different design and libraries which makes them different from each other and a difficult to fix changes.

Due to variation in scrapers’ design, it is difficult to modify them and fix the same issue (any issue, if it appears) in each of them. This issue signals fault in design. To solve this problem, Inheritance can be brought into application. Thus, I created BaseScraper abstract class so that scrapers are more concentrated on fetching data from HTML and all supportive tasks like creating connection with the help of url are defined in BaseScraper.

The concept is pretty easy to implement, but for a perfect implementation, there is a need to go through the complete list of tasks a scraper does.

These are the following tasks with descriptions and how they are implemented using BaseScraper:

  1. Endpoint that triggers the scraper

Every search scraper inherits class AbstractAPIHandler. This is used to fetch get parameters from the endpoint according to which data is scraped from the scraper. The arguments from serviceImpl method is used to generate output and is returned to it as JSONObject.

For this task, the method serviceImpl has been defined in BaseScraper and method getData is implemented to return the output. This method is the driver method of the scraper.

public JSONObject serviceImpl(Query call, HttpServletResponse response, Authorization rights, JSONObjectWithDefault permissions) throws APIException {
    this.setExtra(call);
    return this.getData().toJSON(false, "metadata", "posts");
}

 

  1. Constructor

The constructor of Scraper defines the base URL of the website to be scraped, name of the scraper and data structure to fetch all get parameters input to the scraper. For get parameters, the Map data structure is used to fetch them from Query object.

Since every scraper has it’s own different base URL, scraper name and get parameters used, so it is implemented in respective Scrapers. QuoraProfileScraper is an example which has these variables defined.

  1. Get all input variables

To get all input variables, there are setters and getters defined for fetching them as Map from Query object in BaseScraper. There is also an abstract method getParam(). It is defined in respective scrapers to fetch the useful parameters for scraper and set them to the scraper’s class variables.

// Setter for get parameters from call object
protected void setExtra(Query call) {
    this.extra = call.getMap();
    this.query = call.get("query", "");
    this.setParam();
}

// Getter for get parameter wrt to its key
public String getExtraValue(String key) {
    String value = "";
    if(this.extra.get(key) != null) {
        value = this.extra.get(key).trim();
    }
    return value;
}

// Defination in QuoraProfileScraper
protected void setParam() {
    if(!"".equals(this.getExtraValue("type"))) {
        this.typeList = Arrays.asList(this.getExtraValue("type").trim().split("\\s*,\\s*"));
    } else {
        this.typeList = new ArrayList<String>();
        this.typeList.add("all");
        this.setExtraValue("type", String.join(",", this.typeList));
    }
}

 

  1.  URL creation for web scraper

The URL creation shall be implemented in a separate method as in TwitterScraper. The following is the rough implementation adapted from one of my pull request:

protected String prepareSearchUrl(String type) {
    URIBuilder url = null;
    String midUrl = "search/";

    try {
        switch(type) {
            case "question":
                url = new URIBuilder(this.baseUrl + midUrl);
                url.addParameter("q", this.query);
                url.addParameter("type", "question");
        .
        .
    }
    .
    .
    return url.toString();
}

 

  1. Get BufferedReader object from InputStream

getDataFromConnection method fetches the BufferedReader object from ClientConnection. This object reads the web page line by line by the scrape method to fetch data. See here.

ClientConnection connection = new ClientConnection(url);
BufferedReader br = getHtml(connection);
.
.
.
public BufferedReader getHtml(ClientConnection connection) {

    if (connection.inputStream == null) {
        return null;
    }

    BufferedReader br = new BufferedReader(new InputStreamReader(connection.inputStream, StandardCharsets.UTF_8));
    return br;
}

 

  1. Scraping of data from HTML

The Scraper method for scraping data is declared abstract in BaseScraper and defined in the scraper. This can be a perfect example of implementation for BaseScraper (See code the here) and scraper (here).

  1. Output of data

The output of scrape method is fetched in Post data objects that are implemented for the respective scraper. These Post objects are added to Timeline iterator and which outputs data as JSONArray. Later the objects are output in enclosed Post object wrapper.

This data can be directly output as Post object, but adding it to iterator makes the Post Objects capable to be sorted in an order and be indexed to ElasticSearch.

 

Resources

Iterating the Loklak Server data

Loklak Server is amazing for what it does, but it is more impressive how it does the tasks. Iterators are used for and how to use them, but this project has a customized iterator that iterates Twitter data objects. This iterator is Timeline.java .

Timeline implements an interface iterable (isn’t it iterator?). This interface helps in using Timeline as an iterator and add methods to modify, use or create the data objects. At present, it only iterates Twitter data objects. I am working on it to modify it to iterate data objects from all web scrapers.

The following is a simple example of how an iterator is used.

// Initializing arraylist
List<String> stringsList = Arrays.asList("foo", "bar", "baz");

// Using iterator to display contents of stringsList
System.out.print("Contents of stringsList: ");

Iterator iter = al.iterator();
while(iter.hasNext()) {
    System.out.print(iter.next() + " ");
}

 

This iterator can only iterate data the way array does. (Then why do we need it?) It does the task of iterating objects perfectly, but we can add more functionality to the iterator.

 

Timeline iterator iterates the MessageEntry objects i.e. superclass of TwitterTweet objects. According to Javadocs, “Timeline is a structure which holds tweet for the purpose of presentation, There is no tweet retrieval method here, just an iterator which returns the tweets in reverse appearing order.”

Following are some of the tasks it does:

  1. As an iterator:

This basic use of Timeline is to iterate the MessageEntry objects. It not only iterates the data objects, but also fetches them (See here).

// Declare Timeline object according to order the data object has been created
Timeline tline = new Timeline(Timeline.parseOrder("created_at"));

// Adding data objects to the timeline
tline.add(me1);
tline.add(me2);
.
.
.
// Outputing all data objects as array of JSON objects
for (MessageEntry me: tline) {
    JSONArray postArray = new JSONArray();
    for (MessageEntry post : this) {
        postArray.put(post.toJSON());
    }
}

 

  1. The order of iterating the data objects

Timeline can arrange and iterate the data objects according to the date of creation of the twitter post, number of retweets or number of favourite counts. For this there is an Enum declaration of Order in the Timeline class which is initialized during creation of Timeline object. [link]

    Timeline tline = new Timeline(Timeline.parseOrder("created_at"));

 

  1. Pagination of data objects

There is an object cursor, some methods, including getter and setters to support pagination of the data objects. It is only internally implemented, but can also be used to return a section of the result.

  1. writeToIndex method

This method can be used to write all data fetched by Timeline iterator to ElasticSearch for indexing and to dump that can be used for testing. Thus, indexing of data can concurrently be done while it is iterated. It is implemented here.

  1. Other methods

It also has methods to output all data as JSON and customized method to add data to Timeline keeping user object and Data separate, etc. There are a bit more things in this iterable class which shall be explored instead.

 

Resources:

Using NodeBuilder to instantiate node based Elasticsearch client and Visualising data

As elastic.io mentions, Elasticsearch is a distributed, RESTful search and analytics engine capable of solving a growing number of use cases. But in many setups, it is not possible to manually install an Elasticsearch node on a machine. To handle these type of scenarios, Elasticsearch provides the NodeBuilder module, which can be used to spawn Elasticsearch node programmatically. Let’s see how.

Getting Dependencies

In order to get the ES Java API, we need to add the following line to dependencies.

compile group: 'org.elasticsearch', name: 'securesm', version: '1.0'

The required packages will be fetched the next time we gradle build.

Configuring Settings

In the Elasticsearch Java API, Settings are used to configure the node(s). To create a node, we first need to define its properties.

Settings.Builder settings = new Settings.Builder();

settings.put("cluster.name", "cluster_name");  // The name of the cluster

// Configuring HTTP details
settings.put("http.enabled", "true");
settings.put("http.cors.enabled", "true");
settings.put("http.cors.allow-origin", "https?:\/\/localhost(:[0-9]+)?/");  // Allow requests from localhost
settings.put("http.port", "9200");

// Configuring TCP and host
settings.put("transport.tcp.port", "9300");
settings.put("network.host", "localhost");

// Configuring node details
settings.put("node.data", "true");
settings.put("node.master", "true");

// Configuring index
settings.put("index.number_of_shards", "8");
settings.put("index.number_of_replicas", "2");
settings.put("index.refresh_interval", "10s");
settings.put("index.max_result_window", "10000");

// Defining paths
settings.put("path.conf", "/path/to/conf/");
settings.put("path.data", "/path/to/data/");
settings.put("path.home", "/path/to/data/");

settings.build();  // Buid with the assigned configurations

There are many more settings that can be tuned in order to get desired node configuration.

Building the Node and Getting Clients

The Java API makes it very simple to launch an Elasticsearch node. This example will make use of settings that we just built.

Node elasticsearchNode = NodeBuilder.nodeBuilder().local(false).settings(settings).node();

A piece of cake. Isn’t it? Let’s get a client now, on which we can execute our queries.

Client elasticsearhClient = elasticsearchNode.client();

Shutting Down the Node

elasticsearchNode.close();

A nice implementation of using the module can be seen at ElasticsearchClient.java in the loklak project. It uses the settings from a configuration file and builds the node using it.


Visualisation using elasticsearch-head

So by now, we have an Elasticsearch client which is capable of doing all sorts of operations on the node. But how do we visualise the data that is being stored? Writing code and running it every time to check results is a lengthy thing to do and significantly slows down development/debugging cycle.

To overcome this, we have a web frontend called elasticsearch-head which lets us execute Elasticsearch queries and monitor the cluster.

To run elasticsearch-head, we first need to have grunt-cli installed –

$ sudo npm install -g grunt-cli

Next, we will clone the repository using git and install dependencies –

$ git clone git://github.com/mobz/elasticsearch-head.git
$ cd elasticsearch-head
$ npm install

Next, we simply need to run the server and go to indicated address on a web browser –

$ grunt server

At the top, enter the location at which elasticsearch-head can interact with the cluster and Connect.

Upon connecting, the dashboard appears telling about the status of cluster –

The dashboard shown above is from the loklak project (will talk more about it).

There are 5 major sections in the UI –
1. Overview: The above screenshot, gives details about the indices and shards of the cluster.
2. Index: Gives an overview of all the indices. Also allows to add new from the UI.
3. Browser: Gives a browser window for all the documents in the cluster. It looks something like this –

The left pane allows us to set the filter (index, type and field). The table listed is sortable. But we don’t always get what we are looking for manually. So, we have the following two sections.
4. Structured Query: Gives a dead simple UI that can be used to make a well structured request to Elasticsearch. This is what we need to search for to get Tweets from @gsoc that are indexed –

5. Any Request: Gives an advance console that allows executing any query allowable by Elasticsearch API.

A little about the loklak project and Elasticsearch

loklak is a server application which is able to collect messages from various sources, including twitter. The server contains a search index and a peer-to-peer index sharing interface. All messages are stored in an elasticsearch index.

Source: github/loklak/loklak_server

The project uses Elasticsearch to index all the data that it collects. It uses NodeBuilder to create Elasticsearch node and process the index. It is flexible enough to join an existing cluster instead of creating a new one, just by changing the configuration file.

Conclusion

This blog post tries to explain how NodeBuilder can be used to create Elasticsearch nodes and how they can be configured using Elasticsearch Settings.

It also demonstrates the installation and basic usage of elasticsearch-head, which is a great library to visualise and check queries against an Elasticsearch cluster.

The official Elasticsearch documentation is a good source of reference for its Java API and all other aspects.