Data Indexing in Loklak Server

Loklak Server is a data-scraping system that indexes all the scraped data for the purpose to optimize it. The data fetched by different users is stored as cache. This helps in retrieving of data directly from cache for recurring queries. When users search for the same queries, load on Loklak Server is reduced by outputting indexed data, thus optimizing the operations.

Application

It is dependent on ElasticSearch for indexing of cached data (as JSON). The data that is fetched by different users is stored as cache. This helps in fetching data directly from cache for same queries. When users search for the same queries, load on Loklak Server is reduced and it is optimized by outputting indexed data instead of scraping the same date again.

When is data indexing done?

The indexing of data is done when:

1) Data is scraped:

When data is scraped, data is indexed concurrently while cleaning of data in TwitterTweet data object. For this task, addScheduler static method of IncomingMessageBuffer is used, which acts as

abstract between scraping of data and storing and indexing of data.

The following is the implementation from TwitterScraper (from here). Here writeToIndex is the boolean input to whether index the data or not.

if (this.writeToIndex) IncomingMessageBuffer.addScheduler(this, this.user, true);

2) Data is fetched from backend:

When data is fetched from backend, it is indexed in Timeline iterator. It calls the above method to index data concurrently.

The following is the definition of writeToIndex() method from Timeline.java (from here). When writeToIndex() is called, the fetched data is indexed.

public void writeToIndex() {
    IncomingMessageBuffer.addScheduler(this, true);
}

How?

When addScheduler static method of IncomingMessageBuffer is called, a thread is started that indexes all data. When the messagequeue data structure is filled with some messages, indexing continues.

See here . The DAO method writeMessageBulk is called here to write data. The data is then written to the following streams:

1) Dump: The data fetched is dumped into Import directory in a file. It can also be fetched from other peers.

2) Index: The data fetched is checked if it exists in the index and data that isn’t indexed is indexed.

public static Set<String> writeMessageBulk(Collection<MessageWrapper> mws) {
    List<MessageWrapper> noDump = new ArrayList<>();
    List<MessageWrapper> dump = new ArrayList<>();
    for (MessageWrapper mw: mws) {
        if (mw.t == null) continue;
        if (mw.dump) dump.add(mw);
        else noDump.add(mw);
    }

    Set<String> createdIDs = new HashSet<>();
    createdIDs.addAll(writeMessageBulkNoDump(noDump));
    createdIDs.addAll(writeMessageBulkDump(dump));

    // Does also do an writeMessageBulkNoDump internally
    return createdIDs;
}

 

The above code snippet is from DAO.java, method calls writeMessageBulkNoDump(noDump) indexes the data to ElasticSearch. The definition of this method can be seen here

Whereas for dumping of data writeMessageBulkDump(Dump) is called. It is defined here

Resources:

Published by

Vibhor Verma

Enthusiast | Passionate | on my way