Scraping Concurrently with Loklak Server

At Present, SearchScraper in Loklak Server uses numerous threads to scrape Twitter website. The data fetched is cleaned and more data is extracted from it. But just scraping Twitter is under-performance. Concurrent scraping of other websites like Quora, Youtube, Github, etc can be added to diversify the application. In this way, single endpoint search.json can serve multiple services. As this Feature is under-refinement, We will discuss only the basic structure of the system with new changes. I tried to implement more abstract way of Scraping by:- 1) Fetching the input data in SearchServlet Instead of selecting the input get-parameters and referencing them to be used, Now complete Map object is referenced, helping to be able to add more functionality based on input get-parameters. The dataArray object (as JSONArray) is fetched from DAO.scrapeLoklak method and is embedded in output with key results // start a scraper inputMap.put("query", query); DAO.log(request.getServletPath() + " scraping with query: " + query + " scraper: " + scraper); dataArray = DAO.scrapeLoklak(inputMap, true, true);   2) Scraping the selected Scrapers concurrently In DAO.java, the useful get parameters of inputMap are fetched and cleaned. They are used to choose the scrapers that shall be scraped, using getScraperObjects() method. Timeline2.Order order= getOrder(inputMap.get("order")); Timeline2 dataSet = new Timeline2(order); List<String> scraperList = Arrays.asList(inputMap.get("scraper").trim().split("\\s*,\\s*"));   Threads are created to fetch data from different scrapers according to size of list of scraper objects fetched. input map is passed as argument to the scrapers for further get parameters related to them and output data according to them. List<BaseScraper> scraperObjList = getScraperObjects(scraperList, inputMap); ExecutorService scraperRunner = Executors.newFixedThreadPool(scraperObjList.size()); try{ for (BaseScraper scraper : scraperObjList) { scraperRunner.execute(() -> { dataSet.mergePost(scraper.getData()); }); } } finally { scraperRunner.shutdown(); try { scraperRunner.awaitTermination(24L, TimeUnit.HOURS); } catch (InterruptedException e) { } }   3) Fetching the selected Scraper Objects in DAO.java Here the variable of abstract class BaseScraper (SuperClass of all search scrapers) is used to create List of scrapers to be scraped. All the scrapers' constructors are fed with input map to be scraped accordingly. List<BaseScraper> scraperObjList = new ArrayList<BaseScraper>(); BaseScraper scraperObj = null; if (scraperList.contains("github") || scraperList.contains("all")) { scraperObj = new GithubProfileScraper(inputMap); scraperObjList.add(scraperObj); } . . .   References: Best practices of Multithreading in Java: https://stackoverflow.com/questions/17018507/java-multithreading-best-practice ExecutorService vs Casual Thread Spawner: https://stackoverflow.com/questions/26938210/executorservice-vs-casual-thread-spawner Basic Data Structures used in Java: https://www.eduonix.com/blog/java-programming-2/learn-to-implement-data-structures-in-java/

Continue ReadingScraping Concurrently with Loklak Server

Caching Elasticsearch Aggregation Results in loklak Server

To provide aggregated data for various classifiers, loklak uses Elasticsearch aggregations. Aggregated data speaks a lot more than a few instances from it can say. But performing aggregations on each request can be very resource consuming. So we needed to come up with a way to reduce this load. In this post, I will be discussing how I came up with a caching model for the aggregated data from the Elasticsearch index. Fields to Consider while Caching At the classifier endpoint, aggregations can be requested based on the following fields - Classifier Name Classifier Classes Countries Start Date End Date But to cache results, we can ignore cases where we just require a few classes or countries and store aggregations for all of them instead. So the fields that will define the cache to look for will be - Classifier Name Start Date End Date Type of Cache The data structure used for caching was Java’s HashMap. It would be used to map a special string key to a special object discussed in a later section. Key The key is built using the fields mentioned previously - private static String getKey(String index, String classifier, String sinceDate, String untilDate) { return index + "::::" + classifier + "::::" + (sinceDate == null ? "" : sinceDate) + "::::" + (untilDate == null ? "" : untilDate); } [SOURCE] In this way, we can handle requests where a user makes a request for every class there is without running the expensive aggregation job every time. This is because the key for such requests will be same as we are not considering country and class for this purpose. Value The object used as key in the HashMap is a wrapper containing the following - json - It is a JSONObject containing the actual data. expiry - It is the expiry of the object in milliseconds. class JSONObjectWrapper { private JSONObject json; private long expiry; ... } Timeout The timeout associated with a cache is defined in the configuration file of the project as “classifierservlet.cache.timeout”. It defaults to 5 minutes and is used to set the eexpiryof a cached JSONObject - class JSONObjectWrapper { ... private static long timeout = DAO.getConfig("classifierservlet.cache.timeout", 300000); JSONObjectWrapper(JSONObject json) { this.json = json; this.expiry = System.currentTimeMillis() + timeout; } ... }   Cache Hit For searching in the cache, the previously mentioned string is composed from the parameters requested by the user. Checking for a cache hit can be done in the following manner - String key = getKey(index, classifier, sinceDate, untilDate); if (cacheMap.keySet().contains(key)) { JSONObjectWrapper jw = cacheMap.get(key); if (!jw.isExpired()) { // Do something with jw } } // Calculate the aggregations ... But since jw here would contain all the data, we would need to filter out the classes and countries which are not needed. Filtering results For filtering out the parts which do not contain the information requested by the user, we can perform a simple pass and exclude the results that are not needed. Since…

Continue ReadingCaching Elasticsearch Aggregation Results in loklak Server

Data Indexing in Loklak Server

Loklak Server is a data-scraping system that indexes all the scraped data for the purpose to optimize it. The data fetched by different users is stored as cache. This helps in retrieving of data directly from cache for recurring queries. When users search for the same queries, load on Loklak Server is reduced by outputting indexed data, thus optimizing the operations. Application It is dependent on ElasticSearch for indexing of cached data (as JSON). The data that is fetched by different users is stored as cache. This helps in fetching data directly from cache for same queries. When users search for the same queries, load on Loklak Server is reduced and it is optimized by outputting indexed data instead of scraping the same date again. When is data indexing done? The indexing of data is done when: 1) Data is scraped: When data is scraped, data is indexed concurrently while cleaning of data in TwitterTweet data object. For this task, addScheduler static method of IncomingMessageBuffer is used, which acts as abstract between scraping of data and storing and indexing of data. The following is the implementation from TwitterScraper (from here). Here writeToIndex is the boolean input to whether index the data or not. if (this.writeToIndex) IncomingMessageBuffer.addScheduler(this, this.user, true); 2) Data is fetched from backend: When data is fetched from backend, it is indexed in Timeline iterator. It calls the above method to index data concurrently. The following is the definition of writeToIndex() method from Timeline.java (from here). When writeToIndex() is called, the fetched data is indexed. public void writeToIndex() { IncomingMessageBuffer.addScheduler(this, true); } How? When addScheduler static method of IncomingMessageBuffer is called, a thread is started that indexes all data. When the messagequeue data structure is filled with some messages, indexing continues. See here . The DAO method writeMessageBulk is called here to write data. The data is then written to the following streams: 1) Dump: The data fetched is dumped into Import directory in a file. It can also be fetched from other peers. 2) Index: The data fetched is checked if it exists in the index and data that isn't indexed is indexed. public static Set<String> writeMessageBulk(Collection<MessageWrapper> mws) { List<MessageWrapper> noDump = new ArrayList<>(); List<MessageWrapper> dump = new ArrayList<>(); for (MessageWrapper mw: mws) { if (mw.t == null) continue; if (mw.dump) dump.add(mw); else noDump.add(mw); } Set<String> createdIDs = new HashSet<>(); createdIDs.addAll(writeMessageBulkNoDump(noDump)); createdIDs.addAll(writeMessageBulkDump(dump)); // Does also do an writeMessageBulkNoDump internally return createdIDs; }   The above code snippet is from DAO.java, method calls writeMessageBulkNoDump(noDump) indexes the data to ElasticSearch. The definition of this method can be seen here Whereas for dumping of data writeMessageBulkDump(Dump) is called. It is defined here Resources: Iterable: https://docs.oracle.com/javase/8/docs/api/java/lang/Iterable.html Use of Iterable: https://stackoverflow.com/questions/1059127/what-is-the-iterable-interface-used-for ElasticSearch Webinar: https://www.elastic.co/webinars/getting-started-elasticsearch?elektra=home&storm=sub1 Ways to iterate through loop: https://crunchify.com/how-to-iterate-through-java-list-4-way-to-iterate-through-loop/

Continue ReadingData Indexing in Loklak Server

Some Other Services in Loklak Server

Loklak Server isn't just a scraper system software, it provides numerous other services to perform other interesting functions like Link Unshortening (reverse of link shortening) and video fetching and administrative tasks like status fetching of the Loklak deployment (for analysis in Loklak development use) and many more. Some of these are internally implemented and rest can be used through http endpoints. Also there are some services which aren't complete and are in development stage. Let's go through some of them to know a bit about them and how they can be used. 1) VideoUrlService This is the service to extract video from the website that has a streaming video and output the video file link. This service is in development stage and is functional. Presently, It can fetch twitter video links and output them with different video qualities. Endpoint: /api/videoUrlService.json Implementation Example: curl api/loklak.org/api/videoUrlService.json?id=https://twitter.com/EXOGlobal/status/886182766970257409&id=https://twitter.com/KMbappe/status/885963850708865025 2) Link Unshortening Service This is the service used to unshorten the link. There are shortened URLs which are used to track the Internet Users by Websites. To prevent this, link unshortening service unshortens the link and returns the final untrackable link to the user. Currently this service is in application in TwitterScraper to unshorten the fetched URLs. It has other methods to get Redirect Link and also a link to get final URL from multiple unshortened link. Implementation Example from TwitterScraper.java [LINK]: Matcher m = timeline_link_pattern.matcher(text); if (m.find()) { String expanded = RedirectUnshortener.unShorten(m.group(2)); text = m.replaceFirst(" " + expanded); continue; }   Further it can be used to as a service and can be used directly. New features like fetching featured image from links can be added to this service. Though these stuff are in discussion and enthusiastic contribution is most welcomed. 3) StatusService This is a service that outputs all data related to to Loklak Server deployment's configurations. To access this configuration, api endpoint status.json is used. It outputs the following data: a) About the number of messages it scrapes in an interval of a second, a minute, an hour, a day, etc. b) The configuration of the server like RAM, assigned memory, used memory, number of cores of CPU, cpu load, etc. c) And other configurations related to the application like size of ElasticSearch shards size and their specifications, client request header, number of running threads, etc. Endpoint: /api/status.json Implementation Example: curl api/loklak.org/api/status.json Resources: Code URL Shortener: https://stackoverflow.com/questions/742013/how-to-code-a-url-shortener URL Shortening-Hashing in Practice: https://blog.codinghorror.com/url-shortening-hashes-in-practice/ ElasticSearch: https://www.elastic.co/webinars/getting-started-elasticsearch?elektra=home&storm=sub1 M3U8 format: https://www.lifewire.com/m3u8-file-2621956 Fetch Video using PHP: https://stackoverflow.com/questions/10896233/how-can-i-retrieve-youtube-video-details-from-video-url-using-php  

Continue ReadingSome Other Services in Loklak Server

Using Elasticsearch Aggregations to Analyse Classifier Data in loklak Server

Loklak uses Elasticsearch to index Tweets and other social media entities. It also houses a classifier that classifies Tweets based on emotion, profanity and language. But earlier, this data was available only with the search API and there was no way to get aggregated data out of it. So as a part of my GSoC project, I proposed to introduce a new API endpoint which would allow users to access aggregated data from these classifiers. In this blog post, I will be discussing how aggregations are performed on the Elasticsearch index of Tweets in the loklak server. Structure of index The ES index for Twitter is called messages and it has 3 fields related to classifiers - classifier_emotion classifier_language classifier_profanity With each of these classifiers, we also have a probability attached which represents the confidence of the classifier for assigned class to a Tweet. The name of these fields is given by suffixing the emotion field by _probability (e.g. classifier_emotion_probability). Since I will also be discussing aggregation based on countries in this blog post, there is also a field named place_country_code which saves the ISO 3166-1 alpha-2 code for the country of creation of Tweet. Requesting aggregations using Elasticsearch Java API Elasticsearch comes with a simple Java API which can be used to perform any desired task. To work with data, we need an ES client which can be built from a ES Node (if creating a cluster) or directly as a transport client (if connecting remotely to a cluster) - // Transport client TransportClient tc = TransportClient.builder() .settings(mySettings) .build(); // From a node Node elasticsearchNode = NodeBuilder.nodeBuilder() .local(false).settings(mySettings) .node(); Client nc = elasticsearchNode.client(); [SOURCE] Once we have a client, we can use ES AggregationBuilder to get aggregations from an index - SearchResponse response = elasticsearchClient.prepareSearch(indexName) .setSearchType(SearchType.QUERY_THEN_FETCH) .setQuery(QueryBuilders.matchAllQuery()) // Consider every row .setFrom(0).setSize(0) // 0 offset, 0 result size (do not return any rows) .addAggregation(aggr) // aggr is a AggregatoinBuilder object .execute().actionGet(); // Execute and get results [SOURCE] AggregationBuilders are objects that define the properties of an aggregation task using ES’s Java API. This code snippet is applicable for any type of aggregation that we wish to perform on an index, given that we do not want to fetch any rows as a response. Performing simple aggregation for a classifier In this section, I will discuss the process to get results from a given classifier in loklak’s ES index. Here, we will be targeting a class-wise count of rows and stats (average and sum) of probabilities. Writing AggregationBuilder An AggregationBuilder for this task will be a Terms AggregationBuilder which would dynamically generate buckets for all the different values of fields for a given field in index - AggregationBuilder getClassifierAggregationBuilder(String classifierName) { String probabilityField = classifierName + "_probability"; return AggregationBuilders.terms("by_class").field(classifierName) .subAggregation( AggregationBuilders.avg("avg_probability").field(probabilityField) ) .subAggregation( AggregationBuilders.sum("sum_probability").field(probabilityField) ); } [SOURCE] Here, the name of aggregation is passed as by_class. This will be used while processing the results for this aggregation task. Also, sub-aggregation is used to get average and sum probability by the…

Continue ReadingUsing Elasticsearch Aggregations to Analyse Classifier Data in loklak Server

Making loklak Server’s Kaizen Harvester Extendable

Harvesting strategies in loklak are something that the end users can’t see, but they play a vital role in deciding the way in which messages are collected with loklak. One of the strategies in loklak is defined by the Kaizen Harvester, which generates queries from collected messages. The original strategy used a simple hash queue which drops queries once it is full. This effect is not desirable as we tend to lose important queries in this process if they come up late while harvesting. To overcome this behaviour without losing important search queries, we needed to come up with new harvesting strategy(ies) that would provide a better approach for harvesting. In this blog post, I am discussing the changes made in the kaizen harvester so it can be extended to create different flavors of harvesters. What can be different in extended harvesters? To make the Kaizen harvester extendable, we first needed to decide that what are the parts in the original Kaizen harvester that can be changed to make the strategy different (and probably better). Since one of the most crucial part of the Kaizen harvester was the way it stores queries to be processed, it was one of the most obvious things to change. Another thing that should be allowed to configure across various strategies was the decision of whether to go for harvesting the queries from the query list. Query storage with KaizenQueries To allow different methods of storing the queries, KaizenQueries class was introduced in loklak. It was configured to provide basic methods that would be required for a query storing technique to work. A query storing technique can be any data structure that we can use to store search queries for Kaizen harvester. public abstract class KaizenQueries { public abstract boolean addQuery(String query); public abstract String getQuery(); public abstract int getSize(); public abstract int getMaxSize(); public boolean isEmpty() { return this.getSize() == 0; } } [SOURCE] Also, a default type of KaizenQueries was introduced to use in the original Kaizen harvester. This allowed the same interface as the original queue which was used in the harvester. Another constructor was introduced in Kaizen harvester which allowed setting the KaizenQueries for an instance of its derived classes. It solved the problem of providing an interface of KaizenQueries inside the Kaizen harvester which can be used by any inherited strategy - private KaizenQueries queries = null; public KaizenHarvester(KaizenQueries queries) { ... this.queries = queries; ... } public void someMethod() { ... this.queries.addQuery(someQuery); ... } [SOURCE] This being added, getting new queries or adding new queries was a simple. We just need to use getQuery() and addQuery() methods without worrying about the internal implementations. Configurable decision for harvesting As mentioned earlier, the decision taken for harvesting should also be configurable. For this, a protected method was implemented and used in harvest() method - protected boolean shallHarvest() { float targetProb = random.nextFloat(); float prob = 0.5F; if (this.queries.getMaxSize() > 0) { prob = queries.getSize() / (float)queries.getMaxSize(); } return !this.queries.isEmpty() && targetProb…

Continue ReadingMaking loklak Server’s Kaizen Harvester Extendable

Utilizing Readiness Probes for loklak Dependencies in Kubernetes

When we use any application and fail to connect to it, we do not give up and retry connecting to it again and again. But in the reality we often face this kind of obstacles like application that break instantly or when connecting to an API or database that is not ready yet, the app gets upset and refuses to continue to work. So, something similar to this was happening with api.loklak.org. In such cases we can’t really re-write the whole application again every time the problem occurs.So for this we need to define dependencies of some kind that can handle the situation rather than disappointing the users of loklak app. Solution: We will just wait until a dependent API or backend of loklak is ready and then only start the loklak app. For this to be done, we used Kubernetes Health Checks. Kubernetes health checks are divided into liveness and readiness probes. The purpose of liveness probes are to indicate that your application is running. Readiness probes are meant to check if your application is ready to serve traffic. The right combination of liveness and readiness probes used with Kubernetes deployments can: Enable zero downtime deploys Prevent deployment of broken images Ensure that failed containers are automatically restarted Pod Ready to be Used? A Pod with defined readiness probe won’t receive any traffic until a defined request can be successfully fulfilled. This health checks are defined through the Kubernetes, so we don’t need any changes to be made in our services (APIs). We just need to setup a readiness probe for the APIs that loklak server is depending on. Here you can see the relevant part of the container spec you need to add (in this example we want to know when loklak is ready): readinessProbe: httpGet: path: /api/status.json port: 80 initialDelaySeconds: 30 timeoutSeconds: 3   Readiness Probes Updating deployments of loklak when something pushed into development without readiness probes can result in downtime as old pods are replaced by new pods in case of Kubernetes deployment. If the new pods are misconfigured or somehow broken, that downtime extends until you detect the problem and rollback. With readiness probes, Kubernetes will not send traffic to a pod until the readiness probe is successful. When updating a loklak deployment, it will also leave old one’s running until probes have been successful on new copy. That means that if loklak server new pods are broken in some way, they’ll never see traffic, instead old pods of loklak server will continue to serve all traffic for the deployment. Conclusion Readiness probes is a simple solution to ensure that Pods with dependencies do not get started before their dependencies are ready (in this case for loklak server). This also works with more than one dependency. Resources Code in loklak server: https://github.com/loklak/loklak_server . More about readiness and liveness probes at: https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/ Kubernetes Health checks: https://kubernetes-v1-4.github.io/docs/user-guide/production-pods/#liveness-and-readiness-probes-aka-health-checks Blog posts related to kubernetes: https://www.ianlewis.org/en/tag/kubernetes. Tutorial for creating and using Pods and probes: https://github.com/googlecodelabs/orchestrate-with-kubernetes/blob/master/labs/monitoring-and-health-checks.md

Continue ReadingUtilizing Readiness Probes for loklak Dependencies in Kubernetes

Simplifying Scrapers using BaseScraper

Loklak Server's main function is to scrape data from websites and other sources and output in different formats like JSON, xml and rss. There are many scrapers in the project that scrape data and output them, but are implemented with different design and libraries which makes them different from each other and a difficult to fix changes. Due to variation in scrapers’ design, it is difficult to modify them and fix the same issue (any issue, if it appears) in each of them. This issue signals fault in design. To solve this problem, Inheritance can be brought into application. Thus, I created BaseScraper abstract class so that scrapers are more concentrated on fetching data from HTML and all supportive tasks like creating connection with the help of url are defined in BaseScraper. The concept is pretty easy to implement, but for a perfect implementation, there is a need to go through the complete list of tasks a scraper does. These are the following tasks with descriptions and how they are implemented using BaseScraper: Endpoint that triggers the scraper Every search scraper inherits class AbstractAPIHandler. This is used to fetch get parameters from the endpoint according to which data is scraped from the scraper. The arguments from serviceImpl method is used to generate output and is returned to it as JSONObject. For this task, the method serviceImpl has been defined in BaseScraper and method getData is implemented to return the output. This method is the driver method of the scraper. public JSONObject serviceImpl(Query call, HttpServletResponse response, Authorization rights, JSONObjectWithDefault permissions) throws APIException { this.setExtra(call); return this.getData().toJSON(false, "metadata", "posts"); }   Constructor The constructor of Scraper defines the base URL of the website to be scraped, name of the scraper and data structure to fetch all get parameters input to the scraper. For get parameters, the Map data structure is used to fetch them from Query object. Since every scraper has it's own different base URL, scraper name and get parameters used, so it is implemented in respective Scrapers. QuoraProfileScraper is an example which has these variables defined. Get all input variables To get all input variables, there are setters and getters defined for fetching them as Map from Query object in BaseScraper. There is also an abstract method getParam(). It is defined in respective scrapers to fetch the useful parameters for scraper and set them to the scraper's class variables. // Setter for get parameters from call object protected void setExtra(Query call) { this.extra = call.getMap(); this.query = call.get("query", ""); this.setParam(); } // Getter for get parameter wrt to its key public String getExtraValue(String key) { String value = ""; if(this.extra.get(key) != null) { value = this.extra.get(key).trim(); } return value; } // Defination in QuoraProfileScraper protected void setParam() { if(!"".equals(this.getExtraValue("type"))) { this.typeList = Arrays.asList(this.getExtraValue("type").trim().split("\\s*,\\s*")); } else { this.typeList = new ArrayList<String>(); this.typeList.add("all"); this.setExtraValue("type", String.join(",", this.typeList)); } }    URL creation for web scraper The URL creation shall be implemented in a separate method as in TwitterScraper. The following is the rough…

Continue ReadingSimplifying Scrapers using BaseScraper
Read more about the article Iterating the Loklak Server data
Iterating the Loklak Server data

Iterating the Loklak Server data

Loklak Server is amazing for what it does, but it is more impressive how it does the tasks. Iterators are used for and how to use them, but this project has a customized iterator that iterates Twitter data objects. This iterator is Timeline.java . Timeline implements an interface iterable (isn’t it iterator?). This interface helps in using Timeline as an iterator and add methods to modify, use or create the data objects. At present, it only iterates Twitter data objects. I am working on it to modify it to iterate data objects from all web scrapers. The following is a simple example of how an iterator is used. // Initializing arraylist List<String> stringsList = Arrays.asList("foo", "bar", "baz"); // Using iterator to display contents of stringsList System.out.print("Contents of stringsList: "); Iterator iter = al.iterator(); while(iter.hasNext()) { System.out.print(iter.next() + " "); }   This iterator can only iterate data the way array does. (Then why do we need it?) It does the task of iterating objects perfectly, but we can add more functionality to the iterator.   Timeline iterator iterates the MessageEntry objects i.e. superclass of TwitterTweet objects. According to Javadocs, "Timeline is a structure which holds tweet for the purpose of presentation, There is no tweet retrieval method here, just an iterator which returns the tweets in reverse appearing order." Following are some of the tasks it does: As an iterator: This basic use of Timeline is to iterate the MessageEntry objects. It not only iterates the data objects, but also fetches them (See here). // Declare Timeline object according to order the data object has been created Timeline tline = new Timeline(Timeline.parseOrder("created_at")); // Adding data objects to the timeline tline.add(me1); tline.add(me2); . . . // Outputing all data objects as array of JSON objects for (MessageEntry me: tline) { JSONArray postArray = new JSONArray(); for (MessageEntry post : this) { postArray.put(post.toJSON()); } }   The order of iterating the data objects Timeline can arrange and iterate the data objects according to the date of creation of the twitter post, number of retweets or number of favourite counts. For this there is an Enum declaration of Order in the Timeline class which is initialized during creation of Timeline object. [link] Timeline tline = new Timeline(Timeline.parseOrder("created_at"));   Pagination of data objects There is an object cursor, some methods, including getter and setters to support pagination of the data objects. It is only internally implemented, but can also be used to return a section of the result. writeToIndex method This method can be used to write all data fetched by Timeline iterator to ElasticSearch for indexing and to dump that can be used for testing. Thus, indexing of data can concurrently be done while it is iterated. It is implemented here. Other methods It also has methods to output all data as JSON and customized method to add data to Timeline keeping user object and Data separate, etc. There are a bit more things in this iterable class which shall be explored instead.…

Continue ReadingIterating the Loklak Server data

Multithreading implementation in Loklak Server

Loklak Server is a near-realtime system. It performs a large number of tasks and are very costly in terms of resources. Its basic function is to scrape all data from websites and output it at the endpoint. In addition to scraping data, there is also a need to perform other tasks like refining and cleaning of data. That is why, multiple threads are instantiated. They perform other tasks like: Refining of data and extract more data The data fetched needs to be cleaned and refined before outputting it. Some of the examples are: a) Removal of html tags from tweet text: After extracting text from html data and feeding to TwitterTweet object, it concurrently runs threads to remove all html from text. b) Unshortening of url links: The url links embedded in the tweet text may track the users with the help of shortened urls. To prevent this issue, a thread is instantiated to unshorten the url links concurrently while cleaning of tweet text. Indexing all JSON output data to ElasticSearch While extracting JSON data as output, there is a method here in Timeline.java that indexes data to ElasticSearch. Managing multithreading To manage multithreading, Loklak Server applies following objects: 1. ExecutorService To deal with large numbers of threads ExecutorService object is used to handle threads as it helps JVM to prevent any resource overflow. Thread's lifecycle can be controlled and its creation cost can be optimized. This is the best example of ExecutorService application is here: . . public class TwitterScraper { // Creation of at max 40 threads. This sets max number of threads to 40 at a time public static final ExecutorService executor = Executors.newFixedThreadPool(40); . . . . // Feeding of TwitterTweet object with data TwitterTweet tweet = new TwitterTweet( user.getScreenName(), Long.parseLong(tweettimems.value), props.get("tweettimename").value, props.get("tweetstatusurl").value, props.get("tweettext").value, Long.parseLong(tweetretweetcount.value), Long.parseLong(tweetfavouritecount.value), imgs, vids, place_name, place_id, user, writeToIndex, writeToBackend ); // Starting thread to refine TwitterTweet data if (tweet.willBeTimeConsuming()) { executor.execute(tweet); } . . .   2. basic Thread class Thread class can also be used instead of ExecutorService in cases where there is no resource crunch. But it is always suggested to use ExecutorService due to its benefits. Thread implementation can be used as an anonymous class like here. 3. Runnable interface Runnable interface can be used to create an anonymous class or classes which does more task than just a task concurrently. In Loklak Server, TwitterScraper concurrently indexes the data to ElasticSearch, unshortens link and cleans data. Have a look at implementation here. Resources: Loklak Server: https://github.com/loklak/loklak_server ExecutorService Class: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html MultiThreading: https://en.wikipedia.org/wiki/Multithreading_(computer_architecture) RedirectUnshortener: https://github.com/loklak/loklak_server/blob/0f055ea6d2d768ea13b29c6fee20ab95902d70ab/src/org/loklak/harvester/RedirectUnshortener.java Threads vs ExecutorService: https://stackoverflow.com/questions/26938210/executorservice-vs-casual-thread-spawner

Continue ReadingMultithreading implementation in Loklak Server