Integrating YaCy Grid Locally with Susper

The YaCy Grid is the second-generation implementation of YaCy, a peer-to-peer search engine.The search results can be improved to a great extent by using YaCy-Grid as the new backend for SUSPER. YaCy Grid is the best choice for distributed search topology. The legacy YaCy is made for decentralised and also distributed network. While both the networks are distributed,the YaCy-Grid is centralized and legacy YaCy is decentralized. YaCy Grid facilitates a lot with scaling that will be in our hand and can be done in all aspects​(loading, parsing, indexing) with computing power we choose. In YaCy,Solr is embedded. But in YaCy Grid,we will get elasticsearch cluster.​They are both built around the core underlying search library Lucene.But ​elasticsearch will help us to scale almost indefinitely. In this blog, I will show you how to integrate YaCy Grid with Susper locally and how to use it to fetch results. Implementing YaCy Grid with Susper: Before using YaCy Grid we need to first setup YaCy Grid and crawl to url using crawl start API, more information about that can be found here Implementing YaCy Grid with Susper and Setting up YaCy Grid locally. So, once we are done with setup and crawling, we need to begin using its APIs in Susper. Following are some easy steps in which we can show results from YaCy Grid in a separate tab is Susper. Step 1: Creating a service to fetch results: In order to fetch results from local YaCy Grid server we need to create a service to fetch results from local YaCy Grid server. Here is the class in grid-service.ts which fetches results for us. export class GridSearchService { server = 'http://127.0.0.1:8100'; searchURL = this.server + '/yacy/grid/mcp/index/yacysearch.json?query='; constructor(private http: Http, private jsonp: Jsonp, private store: Store<fromRoot.State>) { } getSearchResults(searchquery) { return this.http .get(this.searchURL+searchquery).map(res => res.json() ).catch(this.handleError); }   Step 2: Modifying results.component.ts file In order to get results from grid-service.ts in results.component.ts we must need to create an instance of the service and use this instance to get the results and store it in variables results.component.ts file and then use these variables to show results in results template. Following is the code that does this for us ngOnInit() { this.grid.getSearchResults(this.searchdata.query).subscribe(res=>{ this.gridResult=res.channels; }); }   gridClick(){ this.getPresentPage(1); this.resultDisplay = 'grid'; this.totalgridresults=this.gridResult[0].totalResults; this.gridmessage='About ' + this.totalgridresults + ' results'; this.gridItems=this.gridResult[0].items; console.log(this.gridItems); }   Step 3: Creating a New tab to show results from YaCy Grid: Now we need to create a tab in the template where we can use local variables in results.component.ts to show the results following the current design pattern here is the code for that <li [class.active_view]="Display('grid')" (click)="gridClick()">YaCy_Grid</li> <!--YaCy Grid--> <div class="container-fluid"> <div class="result message-bar" *ngIf="totalgridresults > 0 && Display('grid')"> {{gridmessage}} </div> <div class="autocorrect"> <app-auto-correct [hidden]="hideAutoCorrect"></app-auto-correct> </div> </div> <div class="grid-result" *ngIf="Display('grid')"> <div class="feed container"> <div *ngFor="let item of gridItems" class="result"> <div class="title"> <a class="title-pointer" href="{{item.link}}" [style.color]="themeService.titleColor">{{item.title}}</a> </div> <div class="link"> <p [style.color]="themeService.linkColor">{{item.link}}</p> </div> <div class="description"> <p [style.color]="themeService.descriptionColor">{{item.pubDate|date:'MMMM d, yyyy'}} - {{item.description}}</p> </div> </div> </div> </div> <!-- END -->   Step 4: Starting YaCy Grid Locally:…

Continue ReadingIntegrating YaCy Grid Locally with Susper

Setting up YaCy Grid locally

SUSPER is a search interface that uses P2P search engine YaCy . Search results are displayed using Solr server which is embedded into YaCy. The retrieval of search results is done using YaCy search API. When a search request is made in one of the search templates, an HTTP request is made to YaCy and the response is done in JSON. In this blog post I will show how to setup YaCy Grid locally. What is YaCy Grid ? The YaCy Grid is the second-generation implementation of YaCy, a peer-to-peer search engine. The required storage functions of the YaCy Grid are:  An asset storage, basically a file sharing environment for YaCy components,an ftp server is used for asset storage.  A message system providing an Enterprise Integration Framework using a message-oriented middleware,RabbitMQ message queues for the message system.  A database system providing search-engine related retrieval functions.It uses Elasticsearch for database operations. How to setup YaCy Grid locally ? YaCy Grid have 4 components MCP(Master Connect Program), Loader, Crawler and  Parser. Clone all the components using --recursive flag. git clone --recursive https://github.com/yacy/yacy_grid_mcp.git git clone --recursive https://github.com/yacy/yacy_grid_parser.git git clone --recursive https://github.com/yacy/yacy_grid_crawler.git git clone --recursive https://github.com/yacy/yacy_grid_loader.git  Now to starting YaCy Grid requires starting Elasticsearch, RabbitMQ with Username `anonymous` and Password `yacy` and an ftp server(it can be omitted as MCP can take over).  All the above steps can also be done in a single step by running a python script in `bin` folder `run_all.py`  Working of `run_all.py` in yacy_grid_mcp: if not checkportopen(9200):    print "Elasticsearch is not running"    mkapps()    elasticversion = 'elasticsearch-5.6.5'    if not os.path.isfile(path_apphome + '/data/mcp-8100/apps/' + elasticversion + '.tar.gz'):        print('Downloading ' + elasticversion)        urllib.urlretrieve ('https://artifacts.elastic.co/downloads/elasticsearch/' + elasticversion + '.tar.gz', path_apphome + '/data/mcp-8100/apps/' + elasticversion + '.tar.gz')    if not os.path.isdir(path_apphome + '/data/mcp-8100/apps/elasticsearch'):        print('Decompressing' + elasticversion)        os.system('tar xfz ' + path_apphome + '/data/mcp-8100/apps/' + elasticversion + '.tar.gz -C ' + path_apphome + '/data/mcp-8100/apps/')        os.rename(path_apphome + '/data/mcp-8100/apps/' + elasticversion, path_apphome + '/data/mcp-8100/apps/elasticsearch')    # run elasticsearch    print('Running Elasticsearch')    os.chdir(path_apphome + '/data/mcp-8100/apps/elasticsearch/bin')    os.system('nohup ./elasticsearch &')   Checks whether Elasticsearch is running or not, if not then runs Elasticsearch. if checkportopen(15672):    print "RabbitMQ is Running"    print "If you have configured it according to YaCy setup press N"    print "If you have not configured it according to YaCy setup or Do not know what to do press Y"    n=raw_input()    if(n=='Y' or n=='y'):        os.system('service rabbitmq-server stop')         if not checkportopen(15672):    print "rabbitmq is not running"    os.system('python bin/start_rabbitmq.py') Checks whether RabbitMQ is running or not, if yes then asks user to configure it according to YaCy Grid setup by pressing Y or else ignore,if not then starts RabbitMQ according to required configuration. subprocess.call('bin/update_all.sh') .Updates all the Grid components including MCP. if not checkportopen(2121):    print "ftp server is not Running" Checks for an ftp server and prints message accordingly. def run_mcp():    subprocess.call(['gnome-terminal', '-e', "gradle run"]) def run_loader():    os.system('cd ../yacy_grid_loader')    subprocess.call(['gnome-terminal', '-e', "gradle run"]) def run_crawler():    os.system('cd ../yacy_grid_crawler')    subprocess.call(['gnome-terminal', '-e', "gradle run"]) def run_parser():    os.system('cd ../yacy_grid_parser')    subprocess.call(['gnome-terminal', '-e', "gradle run"])   Runs all components of YaCy Grid in separate terminal. Once user starts it,…

Continue ReadingSetting up YaCy Grid locally

Open Event Server: Creating/Rebuilding Elasticsearch Index From Existing Data In a PostgreSQL DB Using Python

The Elasticsearch instance in the current Open Event Server deployment is currently just used to store the events and search through it due to limited resources. The project uses a PostgreSQL database, this blog will focus on setting up a job to create the events index if it does not exist. If the indices exists, the job will delete all the previous the data and rebuild the events index. Although the project uses Flask framework, the job will be in pure python so that it can run in background properly while the application continues its work. Celery is used for queueing up the aforementioned jobs. For building the job the first step would be to connect to our database: from config import Config import psycopg2 conn = psycopg2.connect(Config.SQLALCHEMY_DATABASE_URI) cur = conn.cursor()   The next step would be to fetch all the events from the database. We will only be indexing certain attributes of the event which will be useful in search. Rest of them are not stored in the index. The code given below will fetch us a collection of tuples containing the attributes mentioned in the code: cur.execute(        "SELECT id, name, description, searchable_location_name, organizer_name, organizer_description FROM events WHERE state = 'published' and deleted_at is NULL ;")    events = cur.fetchall()   We will be using the the bulk API, which is significantly fast as compared to adding an event one by one via the API. Elasticsearch-py, the official python client for elasticsearch provides the necessary functionality to work with the bulk API of elasticsearch. The helpers present in the client enable us to use generator expressions to insert the data via the bulk API. The generator expression for events will be as follows: event_data = ({'_type': 'event',                   '_index': 'events',                   '_id': event_[0],                   'name': event_[1],                   'description': event_[2] or None,                   'searchable_location_name': event_[3] or None,                   'organizer_name': event_[4] or None,                   'organizer_description': event_[5] or None}                  for event_ in events)   We will now delete the events index if it exists. The the event index will be recreated. The generator expression obtained above will be passed to the bulk API helper and the event index will repopulated. The complete code for the function will now be as follows:   @celery.task(name='rebuild.events.elasticsearch') def cron_rebuild_events_elasticsearch():    """    Re-inserts all eligible events into elasticsearch    :return:    """    conn = psycopg2.connect(Config.SQLALCHEMY_DATABASE_URI)    cur = conn.cursor()    cur.execute(        "SELECT id, name, description, searchable_location_name, organizer_name, organizer_description FROM events WHERE state = 'published' and deleted_at is NULL ;")    events = cur.fetchall()    event_data = ({'_type': 'event',                   '_index': 'events',                   '_id': event_[0],                   'name': event_[1],                   'description': event_[2] or None,                   'searchable_location_name': event_[3] or None,                   'organizer_name': event_[4] or None,                   'organizer_description': event_[5] or None}                  for event_ in events)    es_store.indices.delete('events')    es_store.indices.create('events')    abc = helpers.bulk(es_store, event_data)   Currently we run this job on each week and also on each new deployment. Rebuilding the index is very important as some records may not be indexed when the continuous sync is taking place. To know more about it please visit https://gocardless.com/blog/syncing-postgres-to-elasticsearch-lessons-learned/ Related links: Syncing Postgres to Elasticsearch, lessons learned: https://gocardless.com/blog/syncing-postgres-to-elasticsearch-lessons-learned/ Elasticsearch Python Client: https://github.com/elastic/elasticsearch-py

Continue ReadingOpen Event Server: Creating/Rebuilding Elasticsearch Index From Existing Data In a PostgreSQL DB Using Python

Indexing for multiscrapers in Loklak Server

I recently added multiscraper system which can scrape data from web-scrapers like YoutubeScraper, QuoraScraper, GithubScraper, etc. As scraping is a costly task, it is important to improve it’s efficiency. One of the approach is to index data in cache. TwitterScraper uses multiple sources to optimize the efficiency. This system uses Post message holder object to store data and PostTimeline (a specialized iterator) to iterate the data objects. This difference in data structures from TwitterScraper leads to the need of different approach to implement indexing of data to ElasticSearch (currently in review process). These are the following changes I made while implementing ‘indexing of data’ in the project. 1) Writing of data is invoked only using PostTimeline iterator In TwitterScraper, the data is written in message holder TwitterTweet. So all the tweets are written to index as they are created. Here, when the data is scraped, Writing of the posts is initiated. Scraping of data is considered a heavy process. This approach keeps lower resource usage in average traffic on the server. protected Post putData(Post typeArray, String key, Timeline2 postList) { if(!"cache".equals(this.source)) { postList.writeToIndex(); } return this.putData(typeArray, key, postList.toArray()); } 2) One object for holding a message During the implementation, I kept the same message holder Post and post-iterator PostTimeline from scraping to indexing of data. This helps to keep the structure uniform. Earlier approach involves different types of message wrappers in the way. This approach cuts the processes for looping and transitioning of data structures. 3) Index a list, not a message In TwitterScraper, as the messages are enqueued in the bulk to be indexed. But in this approach, I have enqueued the complete lists. This approach delays the indexing till the scraper is done with processing the html. Creating the queue of postlists: // Add post-lists to queue to be indexed queueClients.incrementAndGet(); try { postQueue.put(postList); } catch (InterruptedException e) { DAO.severe(e); } queueClients.decrementAndGet();   Indexing of the posts in postlists: // Start indexing of data in post-lists for (Timeline2 postList: postBulk) { if (postList.size() < 1) continue; if(postList.dump) { // Dumping of data in a file writeMessageBulkDump(postList); } // Indexing of data to ElasticSearch writeMessageBulkNoDump(postList); }   4) Categorizing the input parameters While searching the index, I have divided the query parameters from scraper into 3 categories. The input parameters are added to those categories (implemented using map data structure) and thus data fetched are according to them. These categories are: // Declaring the QueryBuilder BoolQueryBuilder query = new BoolQueryBuilder();   a) Get the parameter- Get the results for the input fields in map getMap. // Result must have these fields. Acts as AND operator if(getMap != null) { for(Map.Entry<String, String> field : getMap.entrySet()) { query.must(QueryBuilders.termQuery( field.getKey(), field.getValue())); } }   b) Don't get the parameter- Don't get the results for the input fields in map notGetMap. // Result must not have these fields. if(notGetMap != null) { for(Map.Entry<String, String> field : notGetMap.entrySet()) { query.mustNot(QueryBuilders.termQuery( field.getKey(), field.getValue())); } }   c) Get if possible- Get the results with the…

Continue ReadingIndexing for multiscrapers in Loklak Server

Creating A Dockerfile For Yacy Grid MCP

The YaCy Grid is the second-generation implementation of YaCy, a peer-to-peer search engine. A YaCy Grid installation consists of a set of micro-services which communicate with each other using a common infrastructure for data persistence. The task was to deploy the second-generation of YaCy Grid. To do so, we first had created a Dockerfile. This dockerfile should start the micro services such as rabbitmq, Apache ftp and elasticsearch in one docker instance along with MCP. The microservices perform following tasks: Apache ftp server for asset storage. RabbitMQ message queues for the message system. Elasticsearch for database operations. To launch these microservices using Dockerfile, we referred to following documentations regarding running these services locally: https://github.com/yacy/yacy_grid_mcp/blob/master/README.md For creating a Dockerfile we proceeded as follows: FROM ubuntu:latest MAINTAINER Harshit Prasad# Update RUN apt-get update RUN apt-get upgrade -y# add packages # install jdk package for java RUN apt-get install -y git openjdk-8-jdk #install gradle required for build RUN apt-get update && apt-get install -y software-properties-common RUN add-apt-repository ppa:cwchien/gradle RUN apt-get update RUN apt-get install -y wget RUN wget https://services.gradle.org/distributions/gradle-3.4.1-bin.zip RUN mkdir /opt/gradle RUN apt-get install -y unzip RUN unzip -d /opt/gradle gradle-3.4.1-bin.zip RUN PATH=$PATH:/opt/gradle/gradle-3.4.1/bin ENV GRADLE_HOME=/opt/gradle/gradle-3.4.1 ENV PATH=$PATH:$GRADLE_HOME/bin RUN gradle -v # install apache ftp server 1.1.0 RUN wget http://www-eu.apache.org/dist/mina/ftpserver/1.1.0/dist/apache-ftpserver-1.1.0.tar.gz RUN tar xfz apache-ftpserver-1.1.0.tar.gz # install RabbitMQ server RUN wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-generic-unix-3.6.6.tar.xz RUN tar xf rabbitmq-server-generic-unix-3.6.6.tar.xz # install erlang language for RabbitMQ RUN apt-get install -y erlang # install elasticsearch RUN wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.5.0.tar.gz RUN sha1sum elasticsearch-5.5.0.tar.gz RUN tar -xzf elasticsearch-5.5.0.tar.gz # clone yacy_grid_mcp repository RUN git clone https://github.com/nikhilrayaprolu/yacy_grid_mcp.git WORKDIR /yacy_grid_mcp RUN cat docker/config-ftp.properties > ../apache-ftpserver-1.1.0/res/conf/users.properties # compile RUN gradle build RUN mkdir data/mcp-8100/conf/ -p RUN cp docker/config-mcp.properties data/mcp-8100/conf/config.properties RUN chmod +x ./docker/start.sh # Expose web interface ports # 2121: ftp, a FTP server to be used for mass data / file storage # 5672: rabbitmq, a rabbitmq message queue server to be used for global messages, queues and stacks # 9300: elastic, an elasticsearch server or main cluster address for global database storage EXPOSE 2121 5672 9300 9200 15672 8100 # Define default command. ENTRYPOINT ["/bin/bash", "./docker/start.sh"]   We have created a start.sh file to start RabbitMQ and Apache FTP services. At the end, for compilation gradle run will be executed. adduser --disabled-password --gecos '' r adduser r sudo echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers chmod a+rwx /elasticsearch-5.5.0 -R su -m r -c '/elasticsearch-5.5.0/bin/elasticsearch -Ecluster.name=yacygrid &' cd /apache-ftpserver-1.1.0 ./bin/ftpd.sh res/conf/ftpd-typical.xml & /rabbitmq_server-3.6.6/sbin/rabbitmq-server -detached sleep 5s; /rabbitmq_server-3.6.6/sbin/rabbitmq-plugins enable rabbitmq_management /rabbitmq_server-3.6.6/sbin/rabbitmqctl add_user yacygrid password4account echo [{rabbit, [{loopback_users, []}]}]. >> /rabbitmq_server-3.6.6/etc/rabbitmq/rabbitmq.config /rabbitmq_server-3.6.6/sbin/rabbitmqctl set_permissions -p / yacygrid ".*" ".*" ".*" cd /yacy_grid_mcp sleep 5s; gradle run   start.sh will first add username and then password. Then it will start RabbitMQ along with Apache FTP.  For username and password, we have created a separate files to configure their properties during Docker run which can be found here: Configuration of FTP server: https://github.com/yacy/yacy_grid_mcp/blob/master/docker/config-ftp.properties Configuration of MCP service: https://github.com/yacy/yacy_grid_mcp/blob/master/docker/config-mcp.properties The logic behind running all the microservices in one docker instance was: creating each container for microservice and then link those containers…

Continue ReadingCreating A Dockerfile For Yacy Grid MCP

Deploying loklak Server on Kubernetes with External Elasticsearch

Kubernetes is an open-source system for automating deployment, scaling, and management of containerized applications. - kubernetes.io Kubernetes is an awesome cloud platform, which ensures that cloud applications run reliably. It runs automated tests, flawless updates, smart roll out and rollbacks, simple scaling and a lot more. So as a part of GSoC, I worked on taking the loklak server to Kubernetes on Google Cloud Platform. In this blog post, I will be discussing the approach followed to deploy development branch of loklak on Kubernetes. New Docker Image Since Kubernetes deployments work on Docker images, we needed one for the loklak project. The existing image would not be up to the mark for Kubernetes as it contained the declaration of volumes and exposing of ports. So I wrote a new Docker image which could be used in Kubernetes. The image would simply clone loklak server, build the project and trigger the server as CMD - FROM alpine:latest ENV LANG=en_US.UTF-8 ENV JAVA_TOOL_OPTIONS=-Dfile.encoding=UTF8 WORKDIR /loklak_server RUN apk update && apk add openjdk8 git bash && \ git clone https://github.com/loklak/loklak_server.git /loklak_server && \ git checkout development && \ ./gradlew build -x test -x checkstyleTest -x checkstyleMain -x jacocoTestReport && \ # Some Configurations and Cleanups CMD ["bin/start.sh", "-Idn"] [SOURCE] This image wouldn’t have any volumes or exposed ports and we are now free to configure them in the configuration files (discussed in a later section). Building and Pushing Docker Image using Travis To automatically build and push on a commit to the master branch, Travis build is used. In the after_success section, a call to push Docker image is made. Travis environment variables hold the username and password for Docker hub and are used for logging in - docker login -u $DOCKER_USERNAME -p $DOCKER_PASSWORD [SOURCE] We needed checks there to ensure that we are on the right branch for the push and we are not handling a pull request - # Build and push Kubernetes Docker image KUBERNETES_BRANCH=loklak/loklak_server:latest-kubernetes-$TRAVIS_BRANCH KUBERNETES_COMMIT=loklak/loklak_server:kubernetes-$TRAVIS_COMMIT if [ "$TRAVIS_BRANCH" == "development" ]; then docker build -t loklak_server_kubernetes kubernetes/images/development docker tag loklak_server_kubernetes $KUBERNETES_BRANCH docker push $KUBERNETES_BRANCH docker tag $KUBERNETES_BRANCH $KUBERNETES_COMMIT docker push $KUBERNETES_COMMIT elif [ "$TRAVIS_BRANCH" == "master" ]; then # Build and push master else echo "Skipping Kubernetes image push for branch $TRAVIS_BRANCH" fi [SOURCE] Kubernetes Configurations for loklak Kubernetes cluster can completely be configured using configurations written in YAML format. The deployment of loklak uses the previously built image. Initially, the image tagged as latest-kubernetes-development is used - apiVersion: apps/v1beta1 kind: Deployment metadata: name: server namespace: web spec: replicas: 1 template: metadata: labels: app: server spec: containers: - name: server image: loklak/loklak_server:latest-kubernetes-development ... [SOURCE] Readiness and Liveness Probes Probes act as the top level tester for the health of a deployment in Kubernetes. The probes are performed periodically to ensure that things are working fine and appropriate steps are taken if they fail. When a new image is updated, the older pod still runs and servers the requests. It is replaced by the new ones only when the probes are…

Continue ReadingDeploying loklak Server on Kubernetes with External Elasticsearch

Caching Elasticsearch Aggregation Results in loklak Server

To provide aggregated data for various classifiers, loklak uses Elasticsearch aggregations. Aggregated data speaks a lot more than a few instances from it can say. But performing aggregations on each request can be very resource consuming. So we needed to come up with a way to reduce this load. In this post, I will be discussing how I came up with a caching model for the aggregated data from the Elasticsearch index. Fields to Consider while Caching At the classifier endpoint, aggregations can be requested based on the following fields - Classifier Name Classifier Classes Countries Start Date End Date But to cache results, we can ignore cases where we just require a few classes or countries and store aggregations for all of them instead. So the fields that will define the cache to look for will be - Classifier Name Start Date End Date Type of Cache The data structure used for caching was Java’s HashMap. It would be used to map a special string key to a special object discussed in a later section. Key The key is built using the fields mentioned previously - private static String getKey(String index, String classifier, String sinceDate, String untilDate) { return index + "::::" + classifier + "::::" + (sinceDate == null ? "" : sinceDate) + "::::" + (untilDate == null ? "" : untilDate); } [SOURCE] In this way, we can handle requests where a user makes a request for every class there is without running the expensive aggregation job every time. This is because the key for such requests will be same as we are not considering country and class for this purpose. Value The object used as key in the HashMap is a wrapper containing the following - json - It is a JSONObject containing the actual data. expiry - It is the expiry of the object in milliseconds. class JSONObjectWrapper { private JSONObject json; private long expiry; ... } Timeout The timeout associated with a cache is defined in the configuration file of the project as “classifierservlet.cache.timeout”. It defaults to 5 minutes and is used to set the eexpiryof a cached JSONObject - class JSONObjectWrapper { ... private static long timeout = DAO.getConfig("classifierservlet.cache.timeout", 300000); JSONObjectWrapper(JSONObject json) { this.json = json; this.expiry = System.currentTimeMillis() + timeout; } ... }   Cache Hit For searching in the cache, the previously mentioned string is composed from the parameters requested by the user. Checking for a cache hit can be done in the following manner - String key = getKey(index, classifier, sinceDate, untilDate); if (cacheMap.keySet().contains(key)) { JSONObjectWrapper jw = cacheMap.get(key); if (!jw.isExpired()) { // Do something with jw } } // Calculate the aggregations ... But since jw here would contain all the data, we would need to filter out the classes and countries which are not needed. Filtering results For filtering out the parts which do not contain the information requested by the user, we can perform a simple pass and exclude the results that are not needed. Since…

Continue ReadingCaching Elasticsearch Aggregation Results in loklak Server

Data Indexing in Loklak Server

Loklak Server is a data-scraping system that indexes all the scraped data for the purpose to optimize it. The data fetched by different users is stored as cache. This helps in retrieving of data directly from cache for recurring queries. When users search for the same queries, load on Loklak Server is reduced by outputting indexed data, thus optimizing the operations. Application It is dependent on ElasticSearch for indexing of cached data (as JSON). The data that is fetched by different users is stored as cache. This helps in fetching data directly from cache for same queries. When users search for the same queries, load on Loklak Server is reduced and it is optimized by outputting indexed data instead of scraping the same date again. When is data indexing done? The indexing of data is done when: 1) Data is scraped: When data is scraped, data is indexed concurrently while cleaning of data in TwitterTweet data object. For this task, addScheduler static method of IncomingMessageBuffer is used, which acts as abstract between scraping of data and storing and indexing of data. The following is the implementation from TwitterScraper (from here). Here writeToIndex is the boolean input to whether index the data or not. if (this.writeToIndex) IncomingMessageBuffer.addScheduler(this, this.user, true); 2) Data is fetched from backend: When data is fetched from backend, it is indexed in Timeline iterator. It calls the above method to index data concurrently. The following is the definition of writeToIndex() method from Timeline.java (from here). When writeToIndex() is called, the fetched data is indexed. public void writeToIndex() { IncomingMessageBuffer.addScheduler(this, true); } How? When addScheduler static method of IncomingMessageBuffer is called, a thread is started that indexes all data. When the messagequeue data structure is filled with some messages, indexing continues. See here . The DAO method writeMessageBulk is called here to write data. The data is then written to the following streams: 1) Dump: The data fetched is dumped into Import directory in a file. It can also be fetched from other peers. 2) Index: The data fetched is checked if it exists in the index and data that isn't indexed is indexed. public static Set<String> writeMessageBulk(Collection<MessageWrapper> mws) { List<MessageWrapper> noDump = new ArrayList<>(); List<MessageWrapper> dump = new ArrayList<>(); for (MessageWrapper mw: mws) { if (mw.t == null) continue; if (mw.dump) dump.add(mw); else noDump.add(mw); } Set<String> createdIDs = new HashSet<>(); createdIDs.addAll(writeMessageBulkNoDump(noDump)); createdIDs.addAll(writeMessageBulkDump(dump)); // Does also do an writeMessageBulkNoDump internally return createdIDs; }   The above code snippet is from DAO.java, method calls writeMessageBulkNoDump(noDump) indexes the data to ElasticSearch. The definition of this method can be seen here Whereas for dumping of data writeMessageBulkDump(Dump) is called. It is defined here Resources: Iterable: https://docs.oracle.com/javase/8/docs/api/java/lang/Iterable.html Use of Iterable: https://stackoverflow.com/questions/1059127/what-is-the-iterable-interface-used-for ElasticSearch Webinar: https://www.elastic.co/webinars/getting-started-elasticsearch?elektra=home&storm=sub1 Ways to iterate through loop: https://crunchify.com/how-to-iterate-through-java-list-4-way-to-iterate-through-loop/

Continue ReadingData Indexing in Loklak Server

Some Other Services in Loklak Server

Loklak Server isn't just a scraper system software, it provides numerous other services to perform other interesting functions like Link Unshortening (reverse of link shortening) and video fetching and administrative tasks like status fetching of the Loklak deployment (for analysis in Loklak development use) and many more. Some of these are internally implemented and rest can be used through http endpoints. Also there are some services which aren't complete and are in development stage. Let's go through some of them to know a bit about them and how they can be used. 1) VideoUrlService This is the service to extract video from the website that has a streaming video and output the video file link. This service is in development stage and is functional. Presently, It can fetch twitter video links and output them with different video qualities. Endpoint: /api/videoUrlService.json Implementation Example: curl api/loklak.org/api/videoUrlService.json?id=https://twitter.com/EXOGlobal/status/886182766970257409&id=https://twitter.com/KMbappe/status/885963850708865025 2) Link Unshortening Service This is the service used to unshorten the link. There are shortened URLs which are used to track the Internet Users by Websites. To prevent this, link unshortening service unshortens the link and returns the final untrackable link to the user. Currently this service is in application in TwitterScraper to unshorten the fetched URLs. It has other methods to get Redirect Link and also a link to get final URL from multiple unshortened link. Implementation Example from TwitterScraper.java [LINK]: Matcher m = timeline_link_pattern.matcher(text); if (m.find()) { String expanded = RedirectUnshortener.unShorten(m.group(2)); text = m.replaceFirst(" " + expanded); continue; }   Further it can be used to as a service and can be used directly. New features like fetching featured image from links can be added to this service. Though these stuff are in discussion and enthusiastic contribution is most welcomed. 3) StatusService This is a service that outputs all data related to to Loklak Server deployment's configurations. To access this configuration, api endpoint status.json is used. It outputs the following data: a) About the number of messages it scrapes in an interval of a second, a minute, an hour, a day, etc. b) The configuration of the server like RAM, assigned memory, used memory, number of cores of CPU, cpu load, etc. c) And other configurations related to the application like size of ElasticSearch shards size and their specifications, client request header, number of running threads, etc. Endpoint: /api/status.json Implementation Example: curl api/loklak.org/api/status.json Resources: Code URL Shortener: https://stackoverflow.com/questions/742013/how-to-code-a-url-shortener URL Shortening-Hashing in Practice: https://blog.codinghorror.com/url-shortening-hashes-in-practice/ ElasticSearch: https://www.elastic.co/webinars/getting-started-elasticsearch?elektra=home&storm=sub1 M3U8 format: https://www.lifewire.com/m3u8-file-2621956 Fetch Video using PHP: https://stackoverflow.com/questions/10896233/how-can-i-retrieve-youtube-video-details-from-video-url-using-php  

Continue ReadingSome Other Services in Loklak Server

Using Elasticsearch Aggregations to Analyse Classifier Data in loklak Server

Loklak uses Elasticsearch to index Tweets and other social media entities. It also houses a classifier that classifies Tweets based on emotion, profanity and language. But earlier, this data was available only with the search API and there was no way to get aggregated data out of it. So as a part of my GSoC project, I proposed to introduce a new API endpoint which would allow users to access aggregated data from these classifiers. In this blog post, I will be discussing how aggregations are performed on the Elasticsearch index of Tweets in the loklak server. Structure of index The ES index for Twitter is called messages and it has 3 fields related to classifiers - classifier_emotion classifier_language classifier_profanity With each of these classifiers, we also have a probability attached which represents the confidence of the classifier for assigned class to a Tweet. The name of these fields is given by suffixing the emotion field by _probability (e.g. classifier_emotion_probability). Since I will also be discussing aggregation based on countries in this blog post, there is also a field named place_country_code which saves the ISO 3166-1 alpha-2 code for the country of creation of Tweet. Requesting aggregations using Elasticsearch Java API Elasticsearch comes with a simple Java API which can be used to perform any desired task. To work with data, we need an ES client which can be built from a ES Node (if creating a cluster) or directly as a transport client (if connecting remotely to a cluster) - // Transport client TransportClient tc = TransportClient.builder() .settings(mySettings) .build(); // From a node Node elasticsearchNode = NodeBuilder.nodeBuilder() .local(false).settings(mySettings) .node(); Client nc = elasticsearchNode.client(); [SOURCE] Once we have a client, we can use ES AggregationBuilder to get aggregations from an index - SearchResponse response = elasticsearchClient.prepareSearch(indexName) .setSearchType(SearchType.QUERY_THEN_FETCH) .setQuery(QueryBuilders.matchAllQuery()) // Consider every row .setFrom(0).setSize(0) // 0 offset, 0 result size (do not return any rows) .addAggregation(aggr) // aggr is a AggregatoinBuilder object .execute().actionGet(); // Execute and get results [SOURCE] AggregationBuilders are objects that define the properties of an aggregation task using ES’s Java API. This code snippet is applicable for any type of aggregation that we wish to perform on an index, given that we do not want to fetch any rows as a response. Performing simple aggregation for a classifier In this section, I will discuss the process to get results from a given classifier in loklak’s ES index. Here, we will be targeting a class-wise count of rows and stats (average and sum) of probabilities. Writing AggregationBuilder An AggregationBuilder for this task will be a Terms AggregationBuilder which would dynamically generate buckets for all the different values of fields for a given field in index - AggregationBuilder getClassifierAggregationBuilder(String classifierName) { String probabilityField = classifierName + "_probability"; return AggregationBuilders.terms("by_class").field(classifierName) .subAggregation( AggregationBuilders.avg("avg_probability").field(probabilityField) ) .subAggregation( AggregationBuilders.sum("sum_probability").field(probabilityField) ); } [SOURCE] Here, the name of aggregation is passed as by_class. This will be used while processing the results for this aggregation task. Also, sub-aggregation is used to get average and sum probability by the…

Continue ReadingUsing Elasticsearch Aggregations to Analyse Classifier Data in loklak Server