Backend Scraping in Loklak Server

Loklak Server is a peer-to-peer Distributed Scraping System. It scrapes data from websites and also maintain other sources like peers, storage and a backend server to scrape data. Maintaining different sources has it's benefits of not engaging in costly requests to the websites, no scraping of data and no cleaning of data. Loklak Server can maintain a secondary Loklak Server (or a backend server) tuned for storing large amount of data. This enables the primary Loklak Server fetch data in return of pushing all scraped data to the backend. Lately there was a bug in backend search as a new feature of filtering tweets was added to scraping and indexing, but not for backend search. To fix this issue, I had backtracked the backend search codebase and fix it. Let us discuss how Backend Search works:- 1) When query is made from search endpoint with: a) source=all When source is set to all. The first TwitterScraper and Messages from local search server is preferred. If the messages scraped are not enough or no output has been returned for a specific amount of time, then, backend search is initiated b) source=backend SearchServlet specifically scrapes directly from backend server. 2) Fetching data from Backend Server The input parameters fetched from the client is feeded into DAO.searchBackend method. The list of backend servers fetched from config file. Now using these input parameters and backend servers, the required data is scraped and output to the client. In DAO.searchOnOtherPeers method. the request is sent to multiple servers and they are arranged in order of better response rates. This method invokes SearchServlet.search method for sending request to the mentioned servers. List<String> remote = getBackendPeers(); if (remote.size() > 0) { // condition deactivated because we need always at least one peer Timeline tt = searchOnOtherPeers(remote, q, filterList, order, count, timezoneOffset, where, SearchServlet.backend_hash, timeout); if (tt != null) tt.writeToIndex(); return tt; }   3) Creation of request url and sending requests The request url is created according to the input parameters passed to SearchServlet.search method. Here the search url is created according to input parameters and request is sent to the respective servers to fetch the required messages. // URL creation urlstring = protocolhostportstub + "/api/search.json?q=" + URLEncoder.encode(query.replace(' ', '+'), "UTF-8") + "&timezoneOffset=" + timezoneOffset + "&maximumRecords=" + count + "&source=" + (source == null ? "all" : source) + "&minified=true&shortlink=false&timeout=" + timeout; if(!"".equals(filterString = String.join(", ", filterList))) { urlstring = urlstring + "&filter=" + filterString; } // Download data byte[] jsonb = ClientConnection.downloadPeer(urlstring); if (jsonb == null || jsonb.length == 0) throw new IOException("empty content from " + protocolhostportstub); String jsons = UTF8.String(jsonb); JSONObject json = new JSONObject(jsons); if (json == null || json.length() == 0) return tl; // Final data fetched to be returned JSONArray statuses = json.getJSONArray("statuses"); References Social peer-to-peer processes: https://en.wikipedia.org/wiki/Social_peer-to-peer_processes Parallel Random-Access Machine:  http://pages.cs.wisc.edu/~tvrdik/2/html/Section2.html Distributed Algorithm (Cole–Vishkin algorithm): http://homepage.divms.uiowa.edu/~ghosh/color.pdf

Continue ReadingBackend Scraping in Loklak Server

Configuring Youtube Scraper with Search Endpoint in Loklak Server

Youtube Scraper is one of the interesting web scrapers of Loklak Server with unique implementation of its data scraping and data key creation (using RDF). It couldn't be accessed as it didn't have any url endpoint. I configured it to use both as separate endpoint (api/youtubescraper) and search endpoint (/api/search.json). Usage: YoutubeScraper Endpoint: /api/youtubescraperExample:http://api.loklak.org/api/youtubescraper?query=https://www.youtube.com/watch?v=xZ-m55K3FhQ&scraper=youtube SearchServlet Endpoint: /api/search.json Example: http://api.loklak.org/api/search.json?query=https://www.youtube.com/watch?v=xZ-m55K3FhQ&scraper=youtube The configurations added in Loklak Server are:- 1) Endpoint We can access YoutubeScraper using endpoint /api/youtubescraper endpoint. Like other scrapers, I have used BaseScraper class as superclass for this functionality . 2) PrepareSearchUrl The prepareSearchUrl method creates youtube search url that is used to scrape Youtube webpage. YoutubeScraper takes url as input. But youtube link could also be a shortened link. That is why, the video id is stored as query. This approach optimizes the scraper and adds the capability to add more scrapers to it. Currently YoutubeScraper scrapes the video webpages of Youtube, but scrapers for search webpage and channel webpages can also be added. URIBuilder url = null; String midUrl = "search/"; try { switch(type) { case "search": midUrl = "search/"; url = new URIBuilder(this.baseUrl + midUrl); url.addParameter("search_query", this.query); break; case "video": midUrl = "watch/"; url = new URIBuilder(this.baseUrl + midUrl); url.addParameter("v", this.query); break; case "user": midUrl = "channel/"; url = new URIBuilder(this.baseUrl + midUrl + this.query); break; default: url = new URIBuilder(""); break; } } catch (URISyntaxException e) { DAO.log("Invalid Url: baseUrl = " + this.baseUrl + ", mid-URL = " + midUrl + "query = " + this.query + "type = " + type); return ""; }   3) Get-Data-From-Connection The getDataFromConnection method is used to fetch Bufferedreader object and input it to scrape method. In YoutubeScraper, this method has been overrided to prevent using default method implementation i.e. use type=all @Override public Post getDataFromConnection() throws IOException { String url = this.prepareSearchUrl(this.type); return getDataFromConnection(url, this.type); }   4) Set scraper parameters input as get-parameters The Map data-structure of get-parameters fetched by scraper fetches type and query. For URL, the video hash-code is separated from url and then used as query. this.query = this.getExtraValue("query"); this.query = this.query.substring(this.query.length() - 11);   5) Scrape Method Scrape method runs the different scraper methods (in YoutubeScraper, there is only one), iterate it using PostTimeline and wraps in Post object to the output. This simple function can improve flexibility of scraper to scrape different pages concurrently. Post out = new Post(true); Timeline2 postList = new Timeline2(this.order); postList.addPost(this.parseVideo(br, type, url)); out.put("videos", postList.toArray());   References What is an RDF triple explained on Stackoverflow: https://stackoverflow.com/questions/273218/whats-a-rdf-triple Tutorial on Scraping with Regular Expressions: http://stanford.edu/~mgorkove/cgi-bin/rpython_tutorials/Scraping_PDFsText_Files_in_Python_Using_Regular_Expressions.php Youtube Video-Id Format: https://webapps.stackexchange.com/questions/54443/format-for-id-of-youtube-video

Continue ReadingConfiguring Youtube Scraper with Search Endpoint in Loklak Server

Advantage of Open Event Format over xCal, pentabarf/frab XML and iCal

Event apps like Giggity and Giraffe use event formats like xCal, pentabarf/frab XML, iCal etc. In this blog, I present some of the advantages of using FOSSASIA’s Open Event data format over other formats. I added support for Open Event format in these two apps so I describe the advantages and improvements that were made with respect to them. The main problem that is faced in Giggity app is that all the data like social links, microlocations, the link for the logo file etc., can not be fetched from a single file, so a separate raw file is being added to provide this data. Our Open Event format provides all this information from the single URL that could be received from the server so no need to use any separate file. Here is the pentabarf format data for FOSSASIA 2016 conference excluding sessions. Although it provides all the necessary information it leaves the information for logo URL, details for longitude and latitude for microlocations (rooms) and links to social media and website. While the open event format provides all the missing details including some extra information like language, comments etc. See FOSSASIA 2016 Open Event format sample. <conference> <title>FOSSASIA 2016</title> <subtitle/> <venue>Science Centre Road</venue> <city>Singapore</city> <start>2016-03-18</start> <end>2016-03-20</end> <days>3</days> <day_change>09:00:00</day_change> <timeslot_duration>00:00:00</timeslot_duration> </conference> The parsing of received file format gets very complicated in case of iCal, xCal etc. as tags needs to be matched to get the data. Howsoever there are various libraries available for parsing JSON data. So we can create simply an array list of the received data to send it to the adapter. See this example for more information of working code. You can also see the parser for iCal to compare the complexity of the code. The other more common problem is the structure of the formats received is sometimes it becomes complicated to define the sub parts of a single element. For example for the location we define latitude and longitude separately while in iCal format it is just separated by a comma. For example for iCal GEO:1.333194;103.736132 JSON {           "id": 1,           "name": "Stage 1",           "floor": 0,           "latitude": 37.425420,           "longitude": -122.080291,           "room": null } And the information provided is more detailed. Open Event format is well documented and it makes it easier for other developers to work on it. Find the documentation here. References: iCal Documentation by F. Dawson and D. Stenerson - https://www.ietf.org/rfc/rfc2445.txt xCal Documentation by C. Daboo, M. Douglass and S. Lees - https://tools.ietf.org/html/rfc6321  

Continue ReadingAdvantage of Open Event Format over xCal, pentabarf/frab XML and iCal

Export an Event using APIs of Open Event Server

We in FOSSASIA’s Open Event Server project, allow the organizer, co-organizer and the admins to export all the data related to an event in the form of an archive of JSON files. This way the data can be reused in some other place for various different purposes. The basic workflow is something like this: Send a POST request in the /events/{event_id}/export/json with a payload containing whether you require the various media files. The POST request starts a celery task in the background to start extracting data related to event and jsonifying them The celery task url is returned as a response. Sending a GET request to this url gives the status of the task. If the status is either FAILED or SUCCESS then there is the corresponding error message or the result. Separate JSON files for events, speakers, sessions, micro-locations, tracks, session types and custom forms are created. All this files are then archived and the zip is then served on the endpoint /events/{event_id}/exports/{path} Sending a GET request to the above mentioned endpoint downloads a zip containing all the data related to the endpoint. Let’s dive into each of these points one-by-one POST request ( /events/{event_id}/export/json) For making a POST request you firstly need a JWT authentication like most of the other API endpoints. You need to send a payload containing the settings for whether you want the media files related with the event to be downloaded along with the JSON files. An example payload looks like this: {  "image": true,  "video": true,  "document": true,  "audio": true } def export_event(event_id): from helpers.tasks import export_event_task settings = EXPORT_SETTING settings['image'] = request.json.get('image', False) settings['video'] = request.json.get('video', False) settings['document'] = request.json.get('document', False) settings['audio'] = request.json.get('audio', False) # queue task task = export_event_task.delay( current_identity.email, event_id, settings) # create Job create_export_job(task.id, event_id) # in case of testing if current_app.config.get('CELERY_ALWAYS_EAGER'): # send_export_mail(event_id, task.get()) TASK_RESULTS[task.id] = { 'result': task.get(), 'state': task.state } return jsonify( task_url=url_for('tasks.celery_task', task_id=task.id) ) Taking the settings about the media files and the event id, we pass them as parameter to the export event celery task and queue up the task. We then create an entry in the database with the task url and the event id and the user who triggered the export to keep a record of the activity. After that we return as response the url for the celery task to the user. If the celery task is still underway it show a response with ‘state’:’WAITING’. Once, the task is completed, the value of ‘state’ is either ‘FAILED’ or ‘SUCCESS’. If it is SUCCESS it returns the result of the task, in this case the download url for the zip. Celery Task to Export Event Exporting an event is a very time consuming process and we don’t want that this process to come in the way of user interaction with other services. So we needed to use a queueing system that would queue the tasks and execute them in the background with disturbing the main worker from executing the other user…

Continue ReadingExport an Event using APIs of Open Event Server

Scraping Concurrently with Loklak Server

At Present, SearchScraper in Loklak Server uses numerous threads to scrape Twitter website. The data fetched is cleaned and more data is extracted from it. But just scraping Twitter is under-performance. Concurrent scraping of other websites like Quora, Youtube, Github, etc can be added to diversify the application. In this way, single endpoint search.json can serve multiple services. As this Feature is under-refinement, We will discuss only the basic structure of the system with new changes. I tried to implement more abstract way of Scraping by:- 1) Fetching the input data in SearchServlet Instead of selecting the input get-parameters and referencing them to be used, Now complete Map object is referenced, helping to be able to add more functionality based on input get-parameters. The dataArray object (as JSONArray) is fetched from DAO.scrapeLoklak method and is embedded in output with key results // start a scraper inputMap.put("query", query); DAO.log(request.getServletPath() + " scraping with query: " + query + " scraper: " + scraper); dataArray = DAO.scrapeLoklak(inputMap, true, true);   2) Scraping the selected Scrapers concurrently In DAO.java, the useful get parameters of inputMap are fetched and cleaned. They are used to choose the scrapers that shall be scraped, using getScraperObjects() method. Timeline2.Order order= getOrder(inputMap.get("order")); Timeline2 dataSet = new Timeline2(order); List<String> scraperList = Arrays.asList(inputMap.get("scraper").trim().split("\\s*,\\s*"));   Threads are created to fetch data from different scrapers according to size of list of scraper objects fetched. input map is passed as argument to the scrapers for further get parameters related to them and output data according to them. List<BaseScraper> scraperObjList = getScraperObjects(scraperList, inputMap); ExecutorService scraperRunner = Executors.newFixedThreadPool(scraperObjList.size()); try{ for (BaseScraper scraper : scraperObjList) { scraperRunner.execute(() -> { dataSet.mergePost(scraper.getData()); }); } } finally { scraperRunner.shutdown(); try { scraperRunner.awaitTermination(24L, TimeUnit.HOURS); } catch (InterruptedException e) { } }   3) Fetching the selected Scraper Objects in DAO.java Here the variable of abstract class BaseScraper (SuperClass of all search scrapers) is used to create List of scrapers to be scraped. All the scrapers' constructors are fed with input map to be scraped accordingly. List<BaseScraper> scraperObjList = new ArrayList<BaseScraper>(); BaseScraper scraperObj = null; if (scraperList.contains("github") || scraperList.contains("all")) { scraperObj = new GithubProfileScraper(inputMap); scraperObjList.add(scraperObj); } . . .   References: Best practices of Multithreading in Java: https://stackoverflow.com/questions/17018507/java-multithreading-best-practice ExecutorService vs Casual Thread Spawner: https://stackoverflow.com/questions/26938210/executorservice-vs-casual-thread-spawner Basic Data Structures used in Java: https://www.eduonix.com/blog/java-programming-2/learn-to-implement-data-structures-in-java/

Continue ReadingScraping Concurrently with Loklak Server

Data Indexing in Loklak Server

Loklak Server is a data-scraping system that indexes all the scraped data for the purpose to optimize it. The data fetched by different users is stored as cache. This helps in retrieving of data directly from cache for recurring queries. When users search for the same queries, load on Loklak Server is reduced by outputting indexed data, thus optimizing the operations. Application It is dependent on ElasticSearch for indexing of cached data (as JSON). The data that is fetched by different users is stored as cache. This helps in fetching data directly from cache for same queries. When users search for the same queries, load on Loklak Server is reduced and it is optimized by outputting indexed data instead of scraping the same date again. When is data indexing done? The indexing of data is done when: 1) Data is scraped: When data is scraped, data is indexed concurrently while cleaning of data in TwitterTweet data object. For this task, addScheduler static method of IncomingMessageBuffer is used, which acts as abstract between scraping of data and storing and indexing of data. The following is the implementation from TwitterScraper (from here). Here writeToIndex is the boolean input to whether index the data or not. if (this.writeToIndex) IncomingMessageBuffer.addScheduler(this, this.user, true); 2) Data is fetched from backend: When data is fetched from backend, it is indexed in Timeline iterator. It calls the above method to index data concurrently. The following is the definition of writeToIndex() method from Timeline.java (from here). When writeToIndex() is called, the fetched data is indexed. public void writeToIndex() { IncomingMessageBuffer.addScheduler(this, true); } How? When addScheduler static method of IncomingMessageBuffer is called, a thread is started that indexes all data. When the messagequeue data structure is filled with some messages, indexing continues. See here . The DAO method writeMessageBulk is called here to write data. The data is then written to the following streams: 1) Dump: The data fetched is dumped into Import directory in a file. It can also be fetched from other peers. 2) Index: The data fetched is checked if it exists in the index and data that isn't indexed is indexed. public static Set<String> writeMessageBulk(Collection<MessageWrapper> mws) { List<MessageWrapper> noDump = new ArrayList<>(); List<MessageWrapper> dump = new ArrayList<>(); for (MessageWrapper mw: mws) { if (mw.t == null) continue; if (mw.dump) dump.add(mw); else noDump.add(mw); } Set<String> createdIDs = new HashSet<>(); createdIDs.addAll(writeMessageBulkNoDump(noDump)); createdIDs.addAll(writeMessageBulkDump(dump)); // Does also do an writeMessageBulkNoDump internally return createdIDs; }   The above code snippet is from DAO.java, method calls writeMessageBulkNoDump(noDump) indexes the data to ElasticSearch. The definition of this method can be seen here Whereas for dumping of data writeMessageBulkDump(Dump) is called. It is defined here Resources: Iterable: https://docs.oracle.com/javase/8/docs/api/java/lang/Iterable.html Use of Iterable: https://stackoverflow.com/questions/1059127/what-is-the-iterable-interface-used-for ElasticSearch Webinar: https://www.elastic.co/webinars/getting-started-elasticsearch?elektra=home&storm=sub1 Ways to iterate through loop: https://crunchify.com/how-to-iterate-through-java-list-4-way-to-iterate-through-loop/

Continue ReadingData Indexing in Loklak Server

Some Other Services in Loklak Server

Loklak Server isn't just a scraper system software, it provides numerous other services to perform other interesting functions like Link Unshortening (reverse of link shortening) and video fetching and administrative tasks like status fetching of the Loklak deployment (for analysis in Loklak development use) and many more. Some of these are internally implemented and rest can be used through http endpoints. Also there are some services which aren't complete and are in development stage. Let's go through some of them to know a bit about them and how they can be used. 1) VideoUrlService This is the service to extract video from the website that has a streaming video and output the video file link. This service is in development stage and is functional. Presently, It can fetch twitter video links and output them with different video qualities. Endpoint: /api/videoUrlService.json Implementation Example: curl api/loklak.org/api/videoUrlService.json?id=https://twitter.com/EXOGlobal/status/886182766970257409&id=https://twitter.com/KMbappe/status/885963850708865025 2) Link Unshortening Service This is the service used to unshorten the link. There are shortened URLs which are used to track the Internet Users by Websites. To prevent this, link unshortening service unshortens the link and returns the final untrackable link to the user. Currently this service is in application in TwitterScraper to unshorten the fetched URLs. It has other methods to get Redirect Link and also a link to get final URL from multiple unshortened link. Implementation Example from TwitterScraper.java [LINK]: Matcher m = timeline_link_pattern.matcher(text); if (m.find()) { String expanded = RedirectUnshortener.unShorten(m.group(2)); text = m.replaceFirst(" " + expanded); continue; }   Further it can be used to as a service and can be used directly. New features like fetching featured image from links can be added to this service. Though these stuff are in discussion and enthusiastic contribution is most welcomed. 3) StatusService This is a service that outputs all data related to to Loklak Server deployment's configurations. To access this configuration, api endpoint status.json is used. It outputs the following data: a) About the number of messages it scrapes in an interval of a second, a minute, an hour, a day, etc. b) The configuration of the server like RAM, assigned memory, used memory, number of cores of CPU, cpu load, etc. c) And other configurations related to the application like size of ElasticSearch shards size and their specifications, client request header, number of running threads, etc. Endpoint: /api/status.json Implementation Example: curl api/loklak.org/api/status.json Resources: Code URL Shortener: https://stackoverflow.com/questions/742013/how-to-code-a-url-shortener URL Shortening-Hashing in Practice: https://blog.codinghorror.com/url-shortening-hashes-in-practice/ ElasticSearch: https://www.elastic.co/webinars/getting-started-elasticsearch?elektra=home&storm=sub1 M3U8 format: https://www.lifewire.com/m3u8-file-2621956 Fetch Video using PHP: https://stackoverflow.com/questions/10896233/how-can-i-retrieve-youtube-video-details-from-video-url-using-php  

Continue ReadingSome Other Services in Loklak Server

Creating Unit Tests for File Upload Functions in Open Event Server with Python Unittest Library

In FOSSASIA's Open Event Server, we use the Python unittest library for unit testing various modules of the API code. Unittest library provides us with various assertion functions to assert between the actual and the expected values returned by a function or a module. In normal modules, we simply use these assertions to compare the result since the parameters mostly take as input normal data types. However one very important area for unittesting is File Uploading. We cannot really send a particular file or any such payload to the function to unittest it properly, since it expects a request.files kind of data which is obtained only when file is uploaded or sent as a request to an endpoint. For example in this function: def uploaded_file(files, multiple=False): if multiple: files_uploaded = [] for file in files: extension = file.filename.split('.')[1] filename = get_file_name() + '.' + extension filedir = current_app.config.get('BASE_DIR') + '/static/uploads/' if not os.path.isdir(filedir): os.makedirs(filedir) file_path = filedir + filename file.save(file_path) files_uploaded.append(UploadedFile(file_path, filename)) else: extension = files.filename.split('.')[1] filename = get_file_name() + '.' + extension filedir = current_app.config.get('BASE_DIR') + '/static/uploads/' if not os.path.isdir(filedir): os.makedirs(filedir) file_path = filedir + filename files.save(file_path) files_uploaded = UploadedFile(file_path, filename) return files_uploaded So, we need to create a mock uploading system to replicate this check. So inside the unittesting function we create an api route for this particular scope to accept a file as a request. Following is the code: @app.route("/test_upload", methods=['POST']) def upload(): files = request.files['file'] file_uploaded = uploaded_file(files=files) return jsonify( {'path': file_uploaded.file_path, 'name': file_uploaded.filename}) In the above code, it creates an app route with endpoint test_upload. It accepts a request.files. Then it sends this object to the uploaded_file function (the function to be unittested), gets the result of the function, and returns the result in a json format. With this we have the endpoint to mock a file upload ready. Next we need to send a request with file object. We cannot send a normal data which would then be treated as a normal request.form. But we want to receive it in request.files. So we create 2 different classes inheriting other classes. def test_upload_single_file(self): class FileObj(StringIO): def close(self): pass class MyRequest(Request): def _get_file_stream(*args, **kwargs): return FileObj() app.request_class = MyRequest MyRequest class inherits the Request class of Flask framework. We define the file stream of the Request class as the FileObj. Then, we set the request_class attribute of the Flask app to this new MyRequest class. After we have it all setup, we need to send the request and see if the uploaded file is being saved properly or not. For this purpose we take help of StringIO library. StringIO creates a file-like class which can be then used to replicate a file uploading system. So we send the data as {‘file’: (StringIO('1,2,3,4'), 'test_file.csv')}. We send this as data to the /test_upload endpoint that we have created previously. As a result, the endpoint receives the function, saves the file, and returns the filename and file_path for the stored file. with app.test_request_context(): client = app.test_client() resp = client.post('/test_upload',…

Continue ReadingCreating Unit Tests for File Upload Functions in Open Event Server with Python Unittest Library