Scraping Concurrently with Loklak Server

At Present, SearchScraper in Loklak Server uses numerous threads to scrape Twitter website. The data fetched is cleaned and more data is extracted from it. But just scraping Twitter is under-performance.

Concurrent scraping of other websites like Quora, Youtube, Github, etc can be added to diversify the application. In this way, single endpoint search.json can serve multiple services.

As this Feature is under-refinement, We will discuss only the basic structure of the system with new changes. I tried to implement more abstract way of Scraping by:-

1) Fetching the input data in SearchServlet

Instead of selecting the input get-parameters and referencing them to be used, Now complete Map object is referenced, helping to be able to add more functionality based on input get-parameters. The dataArray object (as JSONArray) is fetched from DAO.scrapeLoklak method and is embedded in output with key results

    // start a scraper
    inputMap.put("query", query);
    DAO.log(request.getServletPath() + " scraping with query: "
           + query + " scraper: " + scraper);
    dataArray = DAO.scrapeLoklak(inputMap, true, true);

 

2) Scraping the selected Scrapers concurrently

In DAO.java, the useful get parameters of inputMap are fetched and cleaned. They are used to choose the scrapers that shall be scraped, using getScraperObjects() method.

Timeline2.Order order= getOrder(inputMap.get("order"));
Timeline2 dataSet = new Timeline2(order);
List<String> scraperList = Arrays.asList(inputMap.get("scraper").trim().split("\\s*,\\s*"));

 

Threads are created to fetch data from different scrapers according to size of list of scraper objects fetched. input map is passed as argument to the scrapers for further get parameters related to them and output data according to them.

List<BaseScraper> scraperObjList = getScraperObjects(scraperList, inputMap);
ExecutorService scraperRunner = Executors.newFixedThreadPool(scraperObjList.size());

try{
    for (BaseScraper scraper : scraperObjList)
    {
        scraperRunner.execute(() -> {
            dataSet.mergePost(scraper.getData());
        });

    }

} finally {
    scraperRunner.shutdown();

    try {
        scraperRunner.awaitTermination(24L, TimeUnit.HOURS);
    } catch (InterruptedException e) { }
}

 

3) Fetching the selected Scraper Objects in DAO.java

Here the variable of abstract class BaseScraper (SuperClass of all search scrapers) is used to create List of scrapers to be scraped. All the scrapers’ constructors are fed with input map to be scraped accordingly.

List<BaseScraper> scraperObjList = new ArrayList<BaseScraper>();
BaseScraper scraperObj = null;

if (scraperList.contains("github") || scraperList.contains("all")) {
    scraperObj = new GithubProfileScraper(inputMap);
    scraperObjList.add(scraperObj);
}
.
.
.

 

References:

Caching Elasticsearch Aggregation Results in loklak Server

To provide aggregated data for various classifiers, loklak uses Elasticsearch aggregations. Aggregated data speaks a lot more than a few instances from it can say. But performing aggregations on each request can be very resource consuming. So we needed to come up with a way to reduce this load.

In this post, I will be discussing how I came up with a caching model for the aggregated data from the Elasticsearch index.

Fields to Consider while Caching

At the classifier endpoint, aggregations can be requested based on the following fields –

  • Classifier Name
  • Classifier Classes
  • Countries
  • Start Date
  • End Date

But to cache results, we can ignore cases where we just require a few classes or countries and store aggregations for all of them instead. So the fields that will define the cache to look for will be –

  • Classifier Name
  • Start Date
  • End Date

Type of Cache

The data structure used for caching was Java’s HashMap. It would be used to map a special string key to a special object discussed in a later section.

Key

The key is built using the fields mentioned previously –

private static String getKey(String index, String classifier, String sinceDate, String untilDate) {
    return index + "::::"
        + classifier + "::::"
        + (sinceDate == null ? "" : sinceDate) + "::::"
        + (untilDate == null ? "" : untilDate);
}

[SOURCE]

In this way, we can handle requests where a user makes a request for every class there is without running the expensive aggregation job every time. This is because the key for such requests will be same as we are not considering country and class for this purpose.

Value

The object used as key in the HashMap is a wrapper containing the following –

  1. json – It is a JSONObject containing the actual data.
  2. expiry – It is the expiry of the object in milliseconds.

class JSONObjectWrapper {
    private JSONObject json; 
    private long expiry;
    ... 
}

Timeout

The timeout associated with a cache is defined in the configuration file of the project as “classifierservlet.cache.timeout”. It defaults to 5 minutes and is used to set the eexpiryof a cached JSONObject –

class JSONObjectWrapper {
    ...
    private static long timeout = DAO.getConfig("classifierservlet.cache.timeout", 300000);

    JSONObjectWrapper(JSONObject json) {
        this.json = json;
        this.expiry = System.currentTimeMillis() + timeout;
    }
    ...
}

 

Cache Hit

For searching in the cache, the previously mentioned string is composed from the parameters requested by the user. Checking for a cache hit can be done in the following manner –

String key = getKey(index, classifier, sinceDate, untilDate);
if (cacheMap.keySet().contains(key)) {
    JSONObjectWrapper jw = cacheMap.get(key);
    if (!jw.isExpired()) {
        // Do something with jw
    }
}
// Calculate the aggregations
...

But since jw here would contain all the data, we would need to filter out the classes and countries which are not needed.

Filtering results

For filtering out the parts which do not contain the information requested by the user, we can perform a simple pass and exclude the results that are not needed.

Since the number of fields to filter out, i.e. classes and countries, would not be that high, this process would not be that resource intensive. And at the same time, would save us from requesting heavy aggregation tasks from the user.

Since the data about classes is nested inside the respective country field, we need to perform two level of filtering –

JSONObject retJson = new JSONObject(true);
for (String key : json.keySet()) {
    JSONArray value = filterInnerClasses(json.getJSONArray(key), classes);
    if ("GLOBAL".equals(key) || countries.contains(key)) {
        retJson.put(key, value);
    }
}

Cache Miss

In the case of a cache miss, the helper functions are called from ElasticsearchClient.java to get results. These results are then parsed from HashMap to JSONObject and stored in the cache for future usages.

JSONObject freshCache = getFromElasticsearch(index, classifier, sinceDate, untilDate);
cacheMap.put(key, new JSONObjectWrapper(freshCache));

The getFromElasticsearch method finds all the possible classes and makes a request to the appropriate method in ElasticsearchClient, getting data for all classifiers and all countries.

Conclusion

In this blog post, I discussed the need for caching of aggregations and the way it is achieved in the loklak server. This feature was introduced in pull request loklak/loklak_server#1333 by @singhpratyush (me).

Resources

Data Indexing in Loklak Server

Loklak Server is a data-scraping system that indexes all the scraped data for the purpose to optimize it. The data fetched by different users is stored as cache. This helps in retrieving of data directly from cache for recurring queries. When users search for the same queries, load on Loklak Server is reduced by outputting indexed data, thus optimizing the operations.

Application

It is dependent on ElasticSearch for indexing of cached data (as JSON). The data that is fetched by different users is stored as cache. This helps in fetching data directly from cache for same queries. When users search for the same queries, load on Loklak Server is reduced and it is optimized by outputting indexed data instead of scraping the same date again.

When is data indexing done?

The indexing of data is done when:

1) Data is scraped:

When data is scraped, data is indexed concurrently while cleaning of data in TwitterTweet data object. For this task, addScheduler static method of IncomingMessageBuffer is used, which acts as

abstract between scraping of data and storing and indexing of data.

The following is the implementation from TwitterScraper (from here). Here writeToIndex is the boolean input to whether index the data or not.

if (this.writeToIndex) IncomingMessageBuffer.addScheduler(this, this.user, true);

2) Data is fetched from backend:

When data is fetched from backend, it is indexed in Timeline iterator. It calls the above method to index data concurrently.

The following is the definition of writeToIndex() method from Timeline.java (from here). When writeToIndex() is called, the fetched data is indexed.

public void writeToIndex() {
    IncomingMessageBuffer.addScheduler(this, true);
}

How?

When addScheduler static method of IncomingMessageBuffer is called, a thread is started that indexes all data. When the messagequeue data structure is filled with some messages, indexing continues.

See here . The DAO method writeMessageBulk is called here to write data. The data is then written to the following streams:

1) Dump: The data fetched is dumped into Import directory in a file. It can also be fetched from other peers.

2) Index: The data fetched is checked if it exists in the index and data that isn’t indexed is indexed.

public static Set<String> writeMessageBulk(Collection<MessageWrapper> mws) {
    List<MessageWrapper> noDump = new ArrayList<>();
    List<MessageWrapper> dump = new ArrayList<>();
    for (MessageWrapper mw: mws) {
        if (mw.t == null) continue;
        if (mw.dump) dump.add(mw);
        else noDump.add(mw);
    }

    Set<String> createdIDs = new HashSet<>();
    createdIDs.addAll(writeMessageBulkNoDump(noDump));
    createdIDs.addAll(writeMessageBulkDump(dump));

    // Does also do an writeMessageBulkNoDump internally
    return createdIDs;
}

 

The above code snippet is from DAO.java, method calls writeMessageBulkNoDump(noDump) indexes the data to ElasticSearch. The definition of this method can be seen here

Whereas for dumping of data writeMessageBulkDump(Dump) is called. It is defined here

Resources:

Some Other Services in Loklak Server

Loklak Server isn’t just a scraper system software, it provides numerous other services to perform other interesting functions like Link Unshortening (reverse of link shortening) and video fetching and administrative tasks like status fetching of the Loklak deployment (for analysis in Loklak development use) and many more. Some of these are internally implemented and rest can be used through http endpoints. Also there are some services which aren’t complete and are in development stage.

Let’s go through some of them to know a bit about them and how they can be used.

1) VideoUrlService

This is the service to extract video from the website that has a streaming video and output the video file link. This service is in development stage and is functional. Presently, It can fetch twitter video links and output them with different video qualities.

Endpoint: /api/videoUrlService.json

Implementation Example:

curl api/loklak.org/api/videoUrlService.json?id=https://twitter.com/EXOGlobal/status/886182766970257409&id=https://twitter.com/KMbappe/status/885963850708865025

2) Link Unshortening Service

This is the service used to unshorten the link. There are shortened URLs which are used to track the Internet Users by Websites. To prevent this, link unshortening service unshortens the link and returns the final untrackable link to the user.

Currently this service is in application in TwitterScraper to unshorten the fetched URLs. It has other methods to get Redirect Link and also a link to get final URL from multiple unshortened link.

Implementation Example from TwitterScraper.java [LINK]:

Matcher m = timeline_link_pattern.matcher(text);

if (m.find()) {
    String expanded = RedirectUnshortener.unShorten(m.group(2));
    text = m.replaceFirst(" " + expanded);
    continue;
}

 

Further it can be used to as a service and can be used directly. New features like fetching featured image from links can be added to this service. Though these stuff are in discussion and enthusiastic contribution is most welcomed.

3) StatusService

This is a service that outputs all data related to to Loklak Server deployment’s configurations. To access this configuration, api endpoint status.json is used.

It outputs the following data:

a) About the number of messages it scrapes in an interval of a second, a minute, an hour, a day, etc.

b) The configuration of the server like RAM, assigned memory, used memory, number of cores of CPU, cpu load, etc.

c) And other configurations related to the application like size of ElasticSearch shards size and their specifications, client request header, number of running threads, etc.

Endpoint: /api/status.json

Implementation Example:

curl api/loklak.org/api/status.json

Resources:

 

Using Elasticsearch Aggregations to Analyse Classifier Data in loklak Server

Loklak uses Elasticsearch to index Tweets and other social media entities. It also houses a classifier that classifies Tweets based on emotion, profanity and language. But earlier, this data was available only with the search API and there was no way to get aggregated data out of it. So as a part of my GSoC project, I proposed to introduce a new API endpoint which would allow users to access aggregated data from these classifiers.

In this blog post, I will be discussing how aggregations are performed on the Elasticsearch index of Tweets in the loklak server.

Structure of index

The ES index for Twitter is called messages and it has 3 fields related to classifiers –

  1. classifier_emotion
  2. classifier_language
  3. classifier_profanity

With each of these classifiers, we also have a probability attached which represents the confidence of the classifier for assigned class to a Tweet. The name of these fields is given by suffixing the emotion field by _probability (e.g. classifier_emotion_probability).

Since I will also be discussing aggregation based on countries in this blog post, there is also a field named place_country_code which saves the ISO 3166-1 alpha-2 code for the country of creation of Tweet.

Requesting aggregations using Elasticsearch Java API

Elasticsearch comes with a simple Java API which can be used to perform any desired task. To work with data, we need an ES client which can be built from a ES Node (if creating a cluster) or directly as a transport client (if connecting remotely to a cluster) –

// Transport client
TransportClient tc = TransportClient.builder()
                        .settings(mySettings)
                        .build();

// From a node
Node elasticsearchNode = NodeBuilder.nodeBuilder()
                            .local(false).settings(mySettings)
                            .node();
Client nc = elasticsearchNode.client();

[SOURCE]

Once we have a client, we can use ES AggregationBuilder to get aggregations from an index –

SearchResponse response = elasticsearchClient.prepareSearch(indexName)
                            .setSearchType(SearchType.QUERY_THEN_FETCH)
                            .setQuery(QueryBuilders.matchAllQuery())  // Consider every row
                            .setFrom(0).setSize(0)  // 0 offset, 0 result size (do not return any rows)
                            .addAggregation(aggr)  // aggr is a AggregatoinBuilder object
                            .execute().actionGet();  // Execute and get results

[SOURCE]

AggregationBuilders are objects that define the properties of an aggregation task using ES’s Java API. This code snippet is applicable for any type of aggregation that we wish to perform on an index, given that we do not want to fetch any rows as a response.

Performing simple aggregation for a classifier

In this section, I will discuss the process to get results from a given classifier in loklak’s ES index. Here, we will be targeting a class-wise count of rows and stats (average and sum) of probabilities.

Writing AggregationBuilder

An AggregationBuilder for this task will be a Terms AggregationBuilder which would dynamically generate buckets for all the different values of fields for a given field in index –

AggregationBuilder getClassifierAggregationBuilder(String classifierName) {
    String probabilityField = classifierName + "_probability";
    return AggregationBuilders.terms("by_class").field(classifierName)
        .subAggregation(
            AggregationBuilders.avg("avg_probability").field(probabilityField)
        )
        .subAggregation(
            AggregationBuilders.sum("sum_probability").field(probabilityField)
        );
}

[SOURCE]

Here, the name of aggregation is passed as by_class. This will be used while processing the results for this aggregation task. Also, sub-aggregation is used to get average and sum probability by the name of avg_probability and sum_probability respectively. There is no need to specify to count rows as this is done by default.

Processing results

Once we have executed the aggregation task and received the SearchResponse as sr (say), we can use the name of top level aggregation to get Terms aggregation object –

Terms aggrs = sr.getAggregations().get("by_class");

After that, we can iterate through the buckets to get results –

for (Terms.Bucket bucket : aggrs.getBuckets()) {
String key = bucket.getKeyAsString();
long docCount = bucket.getDocCount(); // Number of rows
// Use name of sub aggregations to get results
Sum sum = bucket.getAggregations().get("sum_probability");
Avg avg = bucket.getAggregations().get("avg_probability");
// Do something with key, docCount, sum and avg
}

[SOURCE]

So in this manner, results from aggregation response can be processed.

Performing nested aggregations for different countries

The previous section described the process to perform aggregation over a single field. For this section, we’ll aim to get results for each country present in the index given a classifier field.

Writing a nested aggregation builder

To get the aggregation required, AggregationBuilder from previous section can be added as a sub-aggregation for the AggregationBuilder for country code field –

AggregationBuilder aggrs = AggregationBuilders.terms("by_country").field("place_country_code")
    .subAggregation(
        AggregationBuilders.terms("by_class").field(classifierName)
            .subAggregation(
                AggregationBuilders.avg("avg_probability").field(probabilityField)
            )
            .subAggregation(
                AggregationBuilders.sum("sum_probability").field(probabilityField)
            );
    );

[SOURCE]

Processing the results

Again, we can get the results by processing the AggregationBuilders by name in a top-to-bottom fashion –

Terms aggrs = response.getAggregations().get("by_country");
for (Terms.Bucket bucket : aggr.getBuckets()) {
    String countryCode = bucket.getKeyAsString();
    Terms classAggrs = bucket.getAggregations().get("by_class");
    for (Terms.Bucket classBucket : classAggr.getBuckets()) {
        String key = classBucket.getKeyAsString();
        long docCount = classBucket.getDocCount();
        Sum sum = classBucket.getAggregations().get("sum_probability");
        Avg avg = classBucket.getAggregations().get("avg_probability");
        ...
    }
    ...
}

[SOURCE]

And we have the data about classifier for each country present in the index.

Conclusion

This blog post explained about Elasticsearch aggregations and their usage in the loklak server project. The changes discussed here were introduced over a series of patches to ElasticsearchClient.java by @singhpratyush (me).

Resources

Making loklak Server’s Kaizen Harvester Extendable

Harvesting strategies in loklak are something that the end users can’t see, but they play a vital role in deciding the way in which messages are collected with loklak. One of the strategies in loklak is defined by the Kaizen Harvester, which generates queries from collected messages.

The original strategy used a simple hash queue which drops queries once it is full. This effect is not desirable as we tend to lose important queries in this process if they come up late while harvesting. To overcome this behaviour without losing important search queries, we needed to come up with new harvesting strategy(ies) that would provide a better approach for harvesting. In this blog post, I am discussing the changes made in the kaizen harvester so it can be extended to create different flavors of harvesters.

What can be different in extended harvesters?

To make the Kaizen harvester extendable, we first needed to decide that what are the parts in the original Kaizen harvester that can be changed to make the strategy different (and probably better).

Since one of the most crucial part of the Kaizen harvester was the way it stores queries to be processed, it was one of the most obvious things to change. Another thing that should be allowed to configure across various strategies was the decision of whether to go for harvesting the queries from the query list.

Query storage with KaizenQueries

To allow different methods of storing the queries, KaizenQueries class was introduced in loklak. It was configured to provide basic methods that would be required for a query storing technique to work. A query storing technique can be any data structure that we can use to store search queries for Kaizen harvester.

public abstract class KaizenQueries {

     public abstract boolean addQuery(String query);

     public abstract String getQuery();

     public abstract int getSize();

     public abstract int getMaxSize();

     public boolean isEmpty() {
         return this.getSize() == 0;
     }
}

[SOURCE]

Also, a default type of KaizenQueries was introduced to use in the original Kaizen harvester. This allowed the same interface as the original queue which was used in the harvester.

Another constructor was introduced in Kaizen harvester which allowed setting the KaizenQueries for an instance of its derived classes. It solved the problem of providing an interface of KaizenQueries inside the Kaizen harvester which can be used by any inherited strategy –

private KaizenQueries queries = null;

public KaizenHarvester(KaizenQueries queries) {
    ...
    this.queries = queries;
    ...
}

public void someMethod() {
    ...
    this.queries.addQuery(someQuery);
    ...
}

[SOURCE]

This being added, getting new queries or adding new queries was a simple. We just need to use getQuery() and addQuery() methods without worrying about the internal implementations.

Configurable decision for harvesting

As mentioned earlier, the decision taken for harvesting should also be configurable. For this, a protected method was implemented and used in harvest() method –

protected boolean shallHarvest() {
    float targetProb = random.nextFloat();
    float prob = 0.5F;
    if (this.queries.getMaxSize() > 0) {
        prob = queries.getSize() / (float)queries.getMaxSize();
    }
    return !this.queries.isEmpty() && targetProb < prob;
}

@Override
public int harvest() {
    if (this.shallHarvest()) {
        return harvestMessages();
    }

    grabSuggestions();

    return 0;
}

[SOURCE]

The shallHarvest() method can be overridden by derived classes to allow any type of harvesting decision that they want. For example, we can configure it in such a way that it harvests if there are any queries in the queue, or to use a different probability distribution instead of linear (maybe gaussian around 1).

Conclusion

This blog post explained about the changes made in loklak’s Kaizen harvester which allowed other harvesting strategies to be built on its top. It discussed the two components changed and how they allowed ease of inheriting the original Kaizen harvester. These changes were proposed in PR loklak/loklak_server#1203 by @singhpratyush (me).

Resources

Utilizing Readiness Probes for loklak Dependencies in Kubernetes

When we use any application and fail to connect to it, we do not give up and retry connecting to it again and again. But in the reality we often face this kind of obstacles like application that break instantly or when connecting to an API or database that is not ready yet, the app gets upset and refuses to continue to work. So, something similar to this was happening with api.loklak.org.
In such cases we can’t really re-write the whole application again every time the problem occurs.So for this we need to define dependencies of some kind that can handle the situation rather than disappointing the users of loklak app.

Solution:

We will just wait until a dependent API or backend of loklak is ready and then only start the loklak app. For this to be done, we used Kubernetes Health Checks.

Kubernetes health checks are divided into liveness and readiness probes.
The purpose of liveness probes are to indicate that your application is running.
Readiness probes are meant to check if your application is ready to serve traffic.
The right combination of liveness and readiness probes used with Kubernetes deployments can:
Enable zero downtime deploys
Prevent deployment of broken images
Ensure that failed containers are automatically restarted

Pod Ready to be Used?

A Pod with defined readiness probe won’t receive any traffic until a defined request can be successfully fulfilled. This health checks are defined through the Kubernetes, so we don’t need any changes to be made in our services (APIs). We just need to setup a readiness probe for the APIs that loklak server is depending on. Here you can see the relevant part of the container spec you need to add (in this example we want to know when loklak is ready):

        readinessProbe:
          httpGet:
            path: /api/status.json
            port: 80
          initialDelaySeconds: 30
          timeoutSeconds: 3

 

Readiness Probes

Updating deployments of loklak when something pushed into development without readiness probes can result in downtime as old pods are replaced by new pods in case of Kubernetes deployment. If the new pods are misconfigured or somehow broken, that downtime extends until you detect the problem and rollback.
With readiness probes, Kubernetes will not send traffic to a pod until the readiness probe is successful. When updating a loklak deployment, it will also leave old one’s running until probes have been successful on new copy. That means that if loklak server new pods are broken in some way, they’ll never see traffic, instead old pods of loklak server will continue to serve all traffic for the deployment.

Conclusion

Readiness probes is a simple solution to ensure that Pods with dependencies do not get started before their dependencies are ready (in this case for loklak server). This also works with more than one dependency.

Resources

Simplifying Scrapers using BaseScraper

Loklak Server‘s main function is to scrape data from websites and other sources and output in different formats like JSON, xml and rss. There are many scrapers in the project that scrape data and output them, but are implemented with different design and libraries which makes them different from each other and a difficult to fix changes.

Due to variation in scrapers’ design, it is difficult to modify them and fix the same issue (any issue, if it appears) in each of them. This issue signals fault in design. To solve this problem, Inheritance can be brought into application. Thus, I created BaseScraper abstract class so that scrapers are more concentrated on fetching data from HTML and all supportive tasks like creating connection with the help of url are defined in BaseScraper.

The concept is pretty easy to implement, but for a perfect implementation, there is a need to go through the complete list of tasks a scraper does.

These are the following tasks with descriptions and how they are implemented using BaseScraper:

  1. Endpoint that triggers the scraper

Every search scraper inherits class AbstractAPIHandler. This is used to fetch get parameters from the endpoint according to which data is scraped from the scraper. The arguments from serviceImpl method is used to generate output and is returned to it as JSONObject.

For this task, the method serviceImpl has been defined in BaseScraper and method getData is implemented to return the output. This method is the driver method of the scraper.

public JSONObject serviceImpl(Query call, HttpServletResponse response, Authorization rights, JSONObjectWithDefault permissions) throws APIException {
    this.setExtra(call);
    return this.getData().toJSON(false, "metadata", "posts");
}

 

  1. Constructor

The constructor of Scraper defines the base URL of the website to be scraped, name of the scraper and data structure to fetch all get parameters input to the scraper. For get parameters, the Map data structure is used to fetch them from Query object.

Since every scraper has it’s own different base URL, scraper name and get parameters used, so it is implemented in respective Scrapers. QuoraProfileScraper is an example which has these variables defined.

  1. Get all input variables

To get all input variables, there are setters and getters defined for fetching them as Map from Query object in BaseScraper. There is also an abstract method getParam(). It is defined in respective scrapers to fetch the useful parameters for scraper and set them to the scraper’s class variables.

// Setter for get parameters from call object
protected void setExtra(Query call) {
    this.extra = call.getMap();
    this.query = call.get("query", "");
    this.setParam();
}

// Getter for get parameter wrt to its key
public String getExtraValue(String key) {
    String value = "";
    if(this.extra.get(key) != null) {
        value = this.extra.get(key).trim();
    }
    return value;
}

// Defination in QuoraProfileScraper
protected void setParam() {
    if(!"".equals(this.getExtraValue("type"))) {
        this.typeList = Arrays.asList(this.getExtraValue("type").trim().split("\\s*,\\s*"));
    } else {
        this.typeList = new ArrayList<String>();
        this.typeList.add("all");
        this.setExtraValue("type", String.join(",", this.typeList));
    }
}

 

  1.  URL creation for web scraper

The URL creation shall be implemented in a separate method as in TwitterScraper. The following is the rough implementation adapted from one of my pull request:

protected String prepareSearchUrl(String type) {
    URIBuilder url = null;
    String midUrl = "search/";

    try {
        switch(type) {
            case "question":
                url = new URIBuilder(this.baseUrl + midUrl);
                url.addParameter("q", this.query);
                url.addParameter("type", "question");
        .
        .
    }
    .
    .
    return url.toString();
}

 

  1. Get BufferedReader object from InputStream

getDataFromConnection method fetches the BufferedReader object from ClientConnection. This object reads the web page line by line by the scrape method to fetch data. See here.

ClientConnection connection = new ClientConnection(url);
BufferedReader br = getHtml(connection);
.
.
.
public BufferedReader getHtml(ClientConnection connection) {

    if (connection.inputStream == null) {
        return null;
    }

    BufferedReader br = new BufferedReader(new InputStreamReader(connection.inputStream, StandardCharsets.UTF_8));
    return br;
}

 

  1. Scraping of data from HTML

The Scraper method for scraping data is declared abstract in BaseScraper and defined in the scraper. This can be a perfect example of implementation for BaseScraper (See code the here) and scraper (here).

  1. Output of data

The output of scrape method is fetched in Post data objects that are implemented for the respective scraper. These Post objects are added to Timeline iterator and which outputs data as JSONArray. Later the objects are output in enclosed Post object wrapper.

This data can be directly output as Post object, but adding it to iterator makes the Post Objects capable to be sorted in an order and be indexed to ElasticSearch.

 

Resources

Iterating the Loklak Server data

Loklak Server is amazing for what it does, but it is more impressive how it does the tasks. Iterators are used for and how to use them, but this project has a customized iterator that iterates Twitter data objects. This iterator is Timeline.java .

Timeline implements an interface iterable (isn’t it iterator?). This interface helps in using Timeline as an iterator and add methods to modify, use or create the data objects. At present, it only iterates Twitter data objects. I am working on it to modify it to iterate data objects from all web scrapers.

The following is a simple example of how an iterator is used.

// Initializing arraylist
List<String> stringsList = Arrays.asList("foo", "bar", "baz");

// Using iterator to display contents of stringsList
System.out.print("Contents of stringsList: ");

Iterator iter = al.iterator();
while(iter.hasNext()) {
    System.out.print(iter.next() + " ");
}

 

This iterator can only iterate data the way array does. (Then why do we need it?) It does the task of iterating objects perfectly, but we can add more functionality to the iterator.

 

Timeline iterator iterates the MessageEntry objects i.e. superclass of TwitterTweet objects. According to Javadocs, “Timeline is a structure which holds tweet for the purpose of presentation, There is no tweet retrieval method here, just an iterator which returns the tweets in reverse appearing order.”

Following are some of the tasks it does:

  1. As an iterator:

This basic use of Timeline is to iterate the MessageEntry objects. It not only iterates the data objects, but also fetches them (See here).

// Declare Timeline object according to order the data object has been created
Timeline tline = new Timeline(Timeline.parseOrder("created_at"));

// Adding data objects to the timeline
tline.add(me1);
tline.add(me2);
.
.
.
// Outputing all data objects as array of JSON objects
for (MessageEntry me: tline) {
    JSONArray postArray = new JSONArray();
    for (MessageEntry post : this) {
        postArray.put(post.toJSON());
    }
}

 

  1. The order of iterating the data objects

Timeline can arrange and iterate the data objects according to the date of creation of the twitter post, number of retweets or number of favourite counts. For this there is an Enum declaration of Order in the Timeline class which is initialized during creation of Timeline object. [link]

    Timeline tline = new Timeline(Timeline.parseOrder("created_at"));

 

  1. Pagination of data objects

There is an object cursor, some methods, including getter and setters to support pagination of the data objects. It is only internally implemented, but can also be used to return a section of the result.

  1. writeToIndex method

This method can be used to write all data fetched by Timeline iterator to ElasticSearch for indexing and to dump that can be used for testing. Thus, indexing of data can concurrently be done while it is iterated. It is implemented here.

  1. Other methods

It also has methods to output all data as JSON and customized method to add data to Timeline keeping user object and Data separate, etc. There are a bit more things in this iterable class which shall be explored instead.

 

Resources:

Multithreading implementation in Loklak Server

Loklak Server is a near-realtime system. It performs a large number of tasks and are very costly in terms of resources. Its basic function is to scrape all data from websites and output it at the endpoint. In addition to scraping data, there is also a need to perform other tasks like refining and cleaning of data. That is why, multiple threads are instantiated. They perform other tasks like:

  1. Refining of data and extract more data

The data fetched needs to be cleaned and refined before outputting it. Some of the examples are:

a) Removal of html tags from tweet text:

After extracting text from html data and feeding to TwitterTweet object, it concurrently runs threads to remove all html from text.

b) Unshortening of url links:

The url links embedded in the tweet text may track the users with the help of shortened urls. To prevent this issue, a thread is instantiated to unshorten the url links concurrently while cleaning of tweet text.

  1. Indexing all JSON output data to ElasticSearch

While extracting JSON data as output, there is a method here in Timeline.java that indexes data to ElasticSearch.

Managing multithreading

To manage multithreading, Loklak Server applies following objects:

1. ExecutorService

To deal with large numbers of threads ExecutorService object is used to handle threads as it helps JVM to prevent any resource overflow. Thread’s lifecycle can be controlled and its creation cost can be optimized. This is the best example of ExecutorService application is here:

.
.
public class TwitterScraper {
    // Creation of at max 40 threads. This sets max number of threads to 40 at a time
    public static final ExecutorService executor = Executors.newFixedThreadPool(40);
    .
    .
    .
    .
    // Feeding of TwitterTweet object with data
    TwitterTweet tweet = new TwitterTweet(
        user.getScreenName(),
        Long.parseLong(tweettimems.value),
        props.get("tweettimename").value,
        props.get("tweetstatusurl").value,
        props.get("tweettext").value,
        Long.parseLong(tweetretweetcount.value),
        Long.parseLong(tweetfavouritecount.value),
        imgs, vids, place_name, place_id,
        user, writeToIndex,  writeToBackend
    );
    // Starting thread to refine TwitterTweet data
    if (tweet.willBeTimeConsuming()) {
       executor.execute(tweet);
    }    .
    .
    .

 

2. basic Thread class

Thread class can also be used instead of ExecutorService in cases where there is no resource crunch. But it is always suggested to use ExecutorService due to its benefits. Thread implementation can be used as an anonymous class like here.

3. Runnable interface

Runnable interface can be used to create an anonymous class or classes which does more task than just a task concurrently. In Loklak Server, TwitterScraper concurrently indexes the data to ElasticSearch, unshortens link and cleans data. Have a look at implementation here.

Resources: