Backend Scraping in Loklak Server

Loklak Server is a peer-to-peer Distributed Scraping System. It scrapes data from websites and also maintain other sources like peers, storage and a backend server to scrape data. Maintaining different sources has it’s benefits of not engaging in costly requests to the websites, no scraping of data and no cleaning of data.

Loklak Server can maintain a secondary Loklak Server (or a backend server) tuned for storing large amount of data. This enables the primary Loklak Server fetch data in return of pushing all scraped data to the backend.

Lately there was a bug in backend search as a new feature of filtering tweets was added to scraping and indexing, but not for backend search. To fix this issue, I had backtracked the backend search codebase and fix it.

Let us discuss how Backend Search works:-

1) When query is made from search endpoint with:

a) source=all

When source is set to all. The first TwitterScraper and Messages from local search server is preferred. If the messages scraped are not enough or no output has been returned for a specific amount of time, then, backend search is initiated

b) source=backend

SearchServlet specifically scrapes directly from backend server.

2) Fetching data from Backend Server

The input parameters fetched from the client is feeded into DAO.searchBackend method. The list of backend servers fetched from config file. Now using these input parameters and backend servers, the required data is scraped and output to the client.

In DAO.searchOnOtherPeers method. the request is sent to multiple servers and they are arranged in order of better response rates. This method invokes SearchServlet.search method for sending request to the mentioned servers.

List<String> remote = getBackendPeers();
if (remote.size() > 0) {
    // condition deactivated because we need always at least one peer
    Timeline tt = searchOnOtherPeers(remote, q, filterList, order, count, timezoneOffset, where, SearchServlet.backend_hash, timeout);
    if (tt != null) tt.writeToIndex();
    return tt;
}

 

3) Creation of request url and sending requests

The request url is created according to the input parameters passed to SearchServlet.search method. Here the search url is created according to input parameters and request is sent to the respective servers to fetch the required messages.

   // URL creation
    urlstring = protocolhostportstub + "/api/search.json?q="
           + URLEncoder.encode(query.replace(' ', '+'), "UTF-8") + "&timezoneOffset="
           + timezoneOffset + "&maximumRecords=" + count + "&source="
           + (source == null ? "all" : source) + "&minified=true&shortlink=false&timeout="
           + timeout;
    if(!"".equals(filterString = String.join(", ", filterList))) {
       urlstring = urlstring + "&filter=" + filterString;
    }
    // Download data
    byte[] jsonb = ClientConnection.downloadPeer(urlstring);
    if (jsonb == null || jsonb.length == 0) throw new IOException("empty content from " + protocolhostportstub);
    String jsons = UTF8.String(jsonb);
    JSONObject json = new JSONObject(jsons);
    if (json == null || json.length() == 0) return tl;
    // Final data fetched to be returned
    JSONArray statuses = json.getJSONArray("statuses");

References

Continue Reading

Configuring Youtube Scraper with Search Endpoint in Loklak Server

Youtube Scraper is one of the interesting web scrapers of Loklak Server with unique implementation of its data scraping and data key creation (using RDF). It couldn’t be accessed as it didn’t have any url endpoint. I configured it to use both as separate endpoint (api/youtubescraper) and search endpoint (/api/search.json).

Usage:

  1. YoutubeScraper Endpoint: /api/youtubescraperExample:http://api.loklak.org/api/youtubescraper?query=https://www.youtube.com/watch?v=xZ-m55K3FhQ&scraper=youtube
  2. SearchServlet Endpoint: /api/search.json

Example: http://api.loklak.org/api/search.json?query=https://www.youtube.com/watch?v=xZ-m55K3FhQ&scraper=youtube

The configurations added in Loklak Server are:-

1) Endpoint

We can access YoutubeScraper using endpoint /api/youtubescraper endpoint. Like other scrapers, I have used BaseScraper class as superclass for this functionality .

2) PrepareSearchUrl

The prepareSearchUrl method creates youtube search url that is used to scrape Youtube webpage. YoutubeScraper takes url as input. But youtube link could also be a shortened link. That is why, the video id is stored as query. This approach optimizes the scraper and adds the capability to add more scrapers to it.

Currently YoutubeScraper scrapes the video webpages of Youtube, but scrapers for search webpage and channel webpages can also be added.

URIBuilder url = null;
String midUrl = "search/";
    try {
       switch(type) {
           case "search":
               midUrl = "search/";
               url = new URIBuilder(this.baseUrl + midUrl);
               url.addParameter("search_query", this.query);
               break;
           case "video":
               midUrl = "watch/";
               url = new URIBuilder(this.baseUrl + midUrl);
               url.addParameter("v", this.query);
               break;
           case "user":
               midUrl = "channel/";
               url = new URIBuilder(this.baseUrl + midUrl + this.query);
               break;
           default:
               url = new URIBuilder("");
               break;
       }
    } catch (URISyntaxException e) {
       DAO.log("Invalid Url: baseUrl = " + this.baseUrl + ", mid-URL = " + midUrl + "query = " + this.query + "type = " + type);
       return "";
    }

 

3) Get-Data-From-Connection

The getDataFromConnection method is used to fetch Bufferedreader object and input it to scrape method. In YoutubeScraper, this method has been overrided to prevent using default method implementation i.e. use type=all

@Override
public Post getDataFromConnection() throws IOException {
    String url = this.prepareSearchUrl(this.type);
    return getDataFromConnection(url, this.type);
}

 

4) Set scraper parameters input as get-parameters

The Map data-structure of get-parameters fetched by scraper fetches type and query. For URL, the video hash-code is separated from url and then used as query.

this.query = this.getExtraValue("query");
this.query = this.query.substring(this.query.length() - 11);

 

5) Scrape Method

Scrape method runs the different scraper methods (in YoutubeScraper, there is only one), iterate it using PostTimeline and wraps in Post object to the output. This simple function can improve flexibility of scraper to scrape different pages concurrently.

Post out = new Post(true);
Timeline2 postList = new Timeline2(this.order);
postList.addPost(this.parseVideo(br, type, url));
out.put("videos", postList.toArray());

 

References

Continue Reading

Advantage of Open Event Format over xCal, pentabarf/frab XML and iCal

Event apps like Giggity and Giraffe use event formats like xCal, pentabarf/frab XML, iCal etc. In this blog, I present some of the advantages of using FOSSASIA’s Open Event data format over other formats. I added support for Open Event format in these two apps so I describe the advantages and improvements that were made with respect to them.

  • The main problem that is faced in Giggity app is that all the data like social links, microlocations, the link for the logo file etc., can not be fetched from a single file, so a separate raw file is being added to provide this data. Our Open Event format provides all this information from the single URL that could be received from the server so no need to use any separate file.
  • Here is the pentabarf format data for FOSSASIA 2016 conference excluding sessions. Although it provides all the necessary information it leaves the information for logo URL, details for longitude and latitude for microlocations (rooms) and links to social media and website. While the open event format provides all the missing details including some extra information like language, comments etc. See FOSSASIA 2016 Open Event format sample.
<conference>
<title>FOSSASIA 2016</title>
<subtitle/>
<venue>Science Centre Road</venue>
<city>Singapore</city>
<start>2016-03-18</start>
<end>2016-03-20</end>
<days>3</days>
<day_change>09:00:00</day_change>
<timeslot_duration>00:00:00</timeslot_duration>
</conference>
  • The parsing of received file format gets very complicated in case of iCal, xCal etc. as tags needs to be matched to get the data. Howsoever there are various libraries available for parsing JSON data. So we can create simply an array list of the received data to send it to the adapter. See this example for more information of working code. You can also see the parser for iCal to compare the complexity of the code.
  • The other more common problem is the structure of the formats received is sometimes it becomes complicated to define the sub parts of a single element. For example for the location we define latitude and longitude separately while in iCal format it is just separated by a comma. For example for

iCal

GEO:1.333194;103.736132

JSON

{
           "id": 1,
           "name": "Stage 1",
           "floor": 0,
           "latitude": 37.425420,
           "longitude": -122.080291,
           "room": null
}

And the information provided is more detailed.

  • Open Event format is well documented and it makes it easier for other developers to work on it. Find the documentation here.

References:

 

Continue Reading

Export an Event using APIs of Open Event Server

We in FOSSASIA’s Open Event Server project, allow the organizer, co-organizer and the admins to export all the data related to an event in the form of an archive of JSON files. This way the data can be reused in some other place for various different purposes. The basic workflow is something like this:

  • Send a POST request in the /events/{event_id}/export/json with a payload containing whether you require the various media files.
  • The POST request starts a celery task in the background to start extracting data related to event and jsonifying them
  • The celery task url is returned as a response. Sending a GET request to this url gives the status of the task. If the status is either FAILED or SUCCESS then there is the corresponding error message or the result.
  • Separate JSON files for events, speakers, sessions, micro-locations, tracks, session types and custom forms are created.
  • All this files are then archived and the zip is then served on the endpoint /events/{event_id}/exports/{path}
  • Sending a GET request to the above mentioned endpoint downloads a zip containing all the data related to the endpoint.

Let’s dive into each of these points one-by-one

POST request ( /events/{event_id}/export/json)

For making a POST request you firstly need a JWT authentication like most of the other API endpoints. You need to send a payload containing the settings for whether you want the media files related with the event to be downloaded along with the JSON files. An example payload looks like this:

{
   "image": true,
   "video": true,
   "document": true,
   "audio": true
 }

def export_event(event_id):
    from helpers.tasks import export_event_task

    settings = EXPORT_SETTING
    settings['image'] = request.json.get('image', False)
    settings['video'] = request.json.get('video', False)
    settings['document'] = request.json.get('document', False)
    settings['audio'] = request.json.get('audio', False)
    # queue task
    task = export_event_task.delay(
        current_identity.email, event_id, settings)
    # create Job
    create_export_job(task.id, event_id)

    # in case of testing
    if current_app.config.get('CELERY_ALWAYS_EAGER'):
        # send_export_mail(event_id, task.get())
        TASK_RESULTS[task.id] = {
            'result': task.get(),
            'state': task.state
        }
    return jsonify(
        task_url=url_for('tasks.celery_task', task_id=task.id)
    )


Taking the settings about the media files and the event id, we pass them as parameter to the export event celery task and queue up the task. We then create an entry in the database with the task url and the event id and the user who triggered the export to keep a record of the activity. After that we return as response the url for the celery task to the user.

If the celery task is still underway it show a response with ‘state’:’WAITING’. Once, the task is completed, the value of ‘state’ is either ‘FAILED’ or ‘SUCCESS’. If it is SUCCESS it returns the result of the task, in this case the download url for the zip.

Celery Task to Export Event

Exporting an event is a very time consuming process and we don’t want that this process to come in the way of user interaction with other services. So we needed to use a queueing system that would queue the tasks and execute them in the background with disturbing the main worker from executing the other user requests. We have used celery to queue tasks in the background and execute them without disturbing the other user requests.

We have created a celery task namely “export.event” which calls the event_export_task_base() which in turn calls the export_event_json() where all the jsonification process is carried out. To start the celery task all we do is export_event_task.delay(event_id, settings) and it return a celery task object with a task id that can be used to check the status of the task.

@celery.task(base=RequestContextTask, name='export.event', bind=True)
def export_event_task(self, email, event_id, settings):
    event = safe_query(db, Event, 'id', event_id, 'event_id')
    try:
        logging.info('Exporting started')
        path = event_export_task_base(event_id, settings)
        # task_id = self.request.id.__str__()  # str(async result)
        download_url = path

        result = {
            'download_url': download_url
        }
        logging.info('Exporting done.. sending email')
        send_export_mail(email=email, event_name=event.name, download_url=download_url)
    except Exception as e:
        print(traceback.format_exc())
        result = {'__error': True, 'result': str(e)}
        logging.info('Error in exporting.. sending email')
        send_export_mail(email=email, event_name=event.name, error_text=str(e))

    return result


After exporting a path to the export zip is returned. We then get the downloading endpoint and return it as the result of the celery task. In case there is an error in the celery task, we print an entire traceback in the celery worker and return the error as a result.

Make the Exported Zip Ready

We have a separate export_helpers.py file in the helpers module of API for performing various tasks related to exporting all the data of the event. The most important function in this file is the export_event_json(). This file accepts the event_id and the settings dictionary. In the export helpers we have global constant dictionaries which contain the order in which the fields are to appear in the JSON files created while exporting.

Firstly, we create the directory for storing the exported JSON and finally the archive of all the JSON files. Then we have a global dictionary named EXPORTS which contains all the tables and their corresponding Models which we want to extract from the database and store as JSON.  From the EXPORTS dict we get the Model names. We use this Models to make queries with the given event_id and retrieve the data from the database. After retrieving data, we use another helper function named _order_json which jsonifies the sqlalchemy data in the order that is mentioned in the dictionary. After this we download the media data, i.e. the slides, images, videos etc. related to that particular Model depending on the settings.

def export_event_json(event_id, settings):
    """
    Exports the event as a zip on the server and return its path
    """
    # make directory
    exports_dir = app.config['BASE_DIR'] + '/static/uploads/exports/'
    if not os.path.isdir(exports_dir):
        os.mkdir(exports_dir)
    dir_path = exports_dir + 'event%d' % int(event_id)
    if os.path.isdir(dir_path):
        shutil.rmtree(dir_path, ignore_errors=True)
    os.mkdir(dir_path)
    # save to directory
    for e in EXPORTS:
        if e[0] == 'event':
            query_obj = db.session.query(e[1]).filter(
                e[1].id == event_id).first()
            data = _order_json(dict(query_obj.__dict__), e)
            _download_media(data, 'event', dir_path, settings)
        else:
            query_objs = db.session.query(e[1]).filter(
                e[1].event_id == event_id).all()
            data = [_order_json(dict(query_obj.__dict__), e) for query_obj in query_objs]
            for count in range(len(data)):
                data[count] = _order_json(data[count], e)
                _download_media(data[count], e[0], dir_path, settings)
        data_str = json.dumps(data, indent=4, ensure_ascii=False).encode('utf-8')
        fp = open(dir_path + '/' + e[0], 'w')
        fp.write(data_str)
        fp.close()
    # add meta
    data_str = json.dumps(
        _generate_meta(), sort_keys=True,
        indent=4, ensure_ascii=False
    ).encode('utf-8')
    fp = open(dir_path + '/meta', 'w')
    fp.write(data_str)
    fp.close()
    # make zip
    shutil.make_archive(dir_path, 'zip', dir_path)
    dir_path = dir_path + ".zip"

    storage_path = UPLOAD_PATHS['exports']['zip'].format(
        event_id=event_id
    )
    uploaded_file = UploadedFile(dir_path, dir_path.rsplit('/', 1)[1])
    storage_url = upload(uploaded_file, storage_path)

    return storage_url


After we receive the json data from the _order_json() function, we create a dump of the json using json.dumps with an indentation of 4 spaces and utf-8 encoding. Then we save this dump in a file named according to the model from which the data was retrieved. This process is repeated for all the models that are mentioned in the EXPORTS dictionary. After all the JSON files are created and all the media is downloaded, we make a zip of the folder.

To do this we use shutil.make_archive. It creates a zip and uploads the zip to the storage service used by the server such as S3, google storage, etc. and returns the url for the zip through which it can be accessed.

Apart from this function, the other major function in this file is to create an export job entry in the database so that we can keep a track about which used started a task related to which event and help us in debugging and security purposes.

Downloading the Zip File

After the exporting is completed, if you send a GET request to the task url, you get a response similar to this:

{
   "result": {
     "download_url": "http://localhost:5000/static/media/exports/1/zip/OGpMM0w2RH/event1.zip"
   },
   "state": "SUCCESS"
 }

So on opening the download url in the browser or using any other tool, you can download the zip file.

One big question however remains is, all the workflow is okay but how do you understand after sending the POST request, that the task is completed and ready to be downloaded? One way of solving this problem is a technique known as polling. In polling what we do is we send a GET request repeatedly after every fixed interval of time. So, what we do is from the POST request we get the url for the export task. You keep polling this task url until the state is either “FAILED” or “SUCCESS”. If it is a SUCCESS you append the download url somewhere in your website which can then clicked to download the archived export of the event.

 

Reference:

 

Continue Reading

Scraping Concurrently with Loklak Server

At Present, SearchScraper in Loklak Server uses numerous threads to scrape Twitter website. The data fetched is cleaned and more data is extracted from it. But just scraping Twitter is under-performance.

Concurrent scraping of other websites like Quora, Youtube, Github, etc can be added to diversify the application. In this way, single endpoint search.json can serve multiple services.

As this Feature is under-refinement, We will discuss only the basic structure of the system with new changes. I tried to implement more abstract way of Scraping by:-

1) Fetching the input data in SearchServlet

Instead of selecting the input get-parameters and referencing them to be used, Now complete Map object is referenced, helping to be able to add more functionality based on input get-parameters. The dataArray object (as JSONArray) is fetched from DAO.scrapeLoklak method and is embedded in output with key results

    // start a scraper
    inputMap.put("query", query);
    DAO.log(request.getServletPath() + " scraping with query: "
           + query + " scraper: " + scraper);
    dataArray = DAO.scrapeLoklak(inputMap, true, true);

 

2) Scraping the selected Scrapers concurrently

In DAO.java, the useful get parameters of inputMap are fetched and cleaned. They are used to choose the scrapers that shall be scraped, using getScraperObjects() method.

Timeline2.Order order= getOrder(inputMap.get("order"));
Timeline2 dataSet = new Timeline2(order);
List<String> scraperList = Arrays.asList(inputMap.get("scraper").trim().split("\\s*,\\s*"));

 

Threads are created to fetch data from different scrapers according to size of list of scraper objects fetched. input map is passed as argument to the scrapers for further get parameters related to them and output data according to them.

List<BaseScraper> scraperObjList = getScraperObjects(scraperList, inputMap);
ExecutorService scraperRunner = Executors.newFixedThreadPool(scraperObjList.size());

try{
    for (BaseScraper scraper : scraperObjList)
    {
        scraperRunner.execute(() -> {
            dataSet.mergePost(scraper.getData());
        });

    }

} finally {
    scraperRunner.shutdown();

    try {
        scraperRunner.awaitTermination(24L, TimeUnit.HOURS);
    } catch (InterruptedException e) { }
}

 

3) Fetching the selected Scraper Objects in DAO.java

Here the variable of abstract class BaseScraper (SuperClass of all search scrapers) is used to create List of scrapers to be scraped. All the scrapers’ constructors are fed with input map to be scraped accordingly.

List<BaseScraper> scraperObjList = new ArrayList<BaseScraper>();
BaseScraper scraperObj = null;

if (scraperList.contains("github") || scraperList.contains("all")) {
    scraperObj = new GithubProfileScraper(inputMap);
    scraperObjList.add(scraperObj);
}
.
.
.

 

References:

Continue Reading

Data Indexing in Loklak Server

Loklak Server is a data-scraping system that indexes all the scraped data for the purpose to optimize it. The data fetched by different users is stored as cache. This helps in retrieving of data directly from cache for recurring queries. When users search for the same queries, load on Loklak Server is reduced by outputting indexed data, thus optimizing the operations.

Application

It is dependent on ElasticSearch for indexing of cached data (as JSON). The data that is fetched by different users is stored as cache. This helps in fetching data directly from cache for same queries. When users search for the same queries, load on Loklak Server is reduced and it is optimized by outputting indexed data instead of scraping the same date again.

When is data indexing done?

The indexing of data is done when:

1) Data is scraped:

When data is scraped, data is indexed concurrently while cleaning of data in TwitterTweet data object. For this task, addScheduler static method of IncomingMessageBuffer is used, which acts as

abstract between scraping of data and storing and indexing of data.

The following is the implementation from TwitterScraper (from here). Here writeToIndex is the boolean input to whether index the data or not.

if (this.writeToIndex) IncomingMessageBuffer.addScheduler(this, this.user, true);

2) Data is fetched from backend:

When data is fetched from backend, it is indexed in Timeline iterator. It calls the above method to index data concurrently.

The following is the definition of writeToIndex() method from Timeline.java (from here). When writeToIndex() is called, the fetched data is indexed.

public void writeToIndex() {
    IncomingMessageBuffer.addScheduler(this, true);
}

How?

When addScheduler static method of IncomingMessageBuffer is called, a thread is started that indexes all data. When the messagequeue data structure is filled with some messages, indexing continues.

See here . The DAO method writeMessageBulk is called here to write data. The data is then written to the following streams:

1) Dump: The data fetched is dumped into Import directory in a file. It can also be fetched from other peers.

2) Index: The data fetched is checked if it exists in the index and data that isn’t indexed is indexed.

public static Set<String> writeMessageBulk(Collection<MessageWrapper> mws) {
    List<MessageWrapper> noDump = new ArrayList<>();
    List<MessageWrapper> dump = new ArrayList<>();
    for (MessageWrapper mw: mws) {
        if (mw.t == null) continue;
        if (mw.dump) dump.add(mw);
        else noDump.add(mw);
    }

    Set<String> createdIDs = new HashSet<>();
    createdIDs.addAll(writeMessageBulkNoDump(noDump));
    createdIDs.addAll(writeMessageBulkDump(dump));

    // Does also do an writeMessageBulkNoDump internally
    return createdIDs;
}

 

The above code snippet is from DAO.java, method calls writeMessageBulkNoDump(noDump) indexes the data to ElasticSearch. The definition of this method can be seen here

Whereas for dumping of data writeMessageBulkDump(Dump) is called. It is defined here

Resources:

Continue Reading

Some Other Services in Loklak Server

Loklak Server isn’t just a scraper system software, it provides numerous other services to perform other interesting functions like Link Unshortening (reverse of link shortening) and video fetching and administrative tasks like status fetching of the Loklak deployment (for analysis in Loklak development use) and many more. Some of these are internally implemented and rest can be used through http endpoints. Also there are some services which aren’t complete and are in development stage.

Let’s go through some of them to know a bit about them and how they can be used.

1) VideoUrlService

This is the service to extract video from the website that has a streaming video and output the video file link. This service is in development stage and is functional. Presently, It can fetch twitter video links and output them with different video qualities.

Endpoint: /api/videoUrlService.json

Implementation Example:

curl api/loklak.org/api/videoUrlService.json?id=https://twitter.com/EXOGlobal/status/886182766970257409&id=https://twitter.com/KMbappe/status/885963850708865025

2) Link Unshortening Service

This is the service used to unshorten the link. There are shortened URLs which are used to track the Internet Users by Websites. To prevent this, link unshortening service unshortens the link and returns the final untrackable link to the user.

Currently this service is in application in TwitterScraper to unshorten the fetched URLs. It has other methods to get Redirect Link and also a link to get final URL from multiple unshortened link.

Implementation Example from TwitterScraper.java [LINK]:

Matcher m = timeline_link_pattern.matcher(text);

if (m.find()) {
    String expanded = RedirectUnshortener.unShorten(m.group(2));
    text = m.replaceFirst(" " + expanded);
    continue;
}

 

Further it can be used to as a service and can be used directly. New features like fetching featured image from links can be added to this service. Though these stuff are in discussion and enthusiastic contribution is most welcomed.

3) StatusService

This is a service that outputs all data related to to Loklak Server deployment’s configurations. To access this configuration, api endpoint status.json is used.

It outputs the following data:

a) About the number of messages it scrapes in an interval of a second, a minute, an hour, a day, etc.

b) The configuration of the server like RAM, assigned memory, used memory, number of cores of CPU, cpu load, etc.

c) And other configurations related to the application like size of ElasticSearch shards size and their specifications, client request header, number of running threads, etc.

Endpoint: /api/status.json

Implementation Example:

curl api/loklak.org/api/status.json

Resources:

 

Continue Reading

Creating Unit Tests for File Upload Functions in Open Event Server with Python Unittest Library

In FOSSASIA‘s Open Event Server, we use the Python unittest library for unit testing various modules of the API code. Unittest library provides us with various assertion functions to assert between the actual and the expected values returned by a function or a module. In normal modules, we simply use these assertions to compare the result since the parameters mostly take as input normal data types. However one very important area for unittesting is File Uploading. We cannot really send a particular file or any such payload to the function to unittest it properly, since it expects a request.files kind of data which is obtained only when file is uploaded or sent as a request to an endpoint. For example in this function:

def uploaded_file(files, multiple=False):
    if multiple:
        files_uploaded = []
        for file in files:
            extension = file.filename.split('.')[1]
            filename = get_file_name() + '.' + extension
            filedir = current_app.config.get('BASE_DIR') + '/static/uploads/'
            if not os.path.isdir(filedir):
                os.makedirs(filedir)
            file_path = filedir + filename
            file.save(file_path)
            files_uploaded.append(UploadedFile(file_path, filename))

    else:
        extension = files.filename.split('.')[1]
        filename = get_file_name() + '.' + extension
        filedir = current_app.config.get('BASE_DIR') + '/static/uploads/'
        if not os.path.isdir(filedir):
            os.makedirs(filedir)
        file_path = filedir + filename
        files.save(file_path)
        files_uploaded = UploadedFile(file_path, filename)

    return files_uploaded


So, we need to create a mock uploading system to replicate this check. So inside the unittesting function we create an api route for this particular scope to accept a file as a request. Following is the code:

@app.route("/test_upload", methods=['POST'])
        def upload():
            files = request.files['file']
            file_uploaded = uploaded_file(files=files)
            return jsonify(
                {'path': file_uploaded.file_path,
                 'name': file_uploaded.filename})


In the above code, it creates an app route with endpoint test_upload. It accepts a request.files. Then it sends this object to the
uploaded_file function (the function to be unittested), gets the result of the function, and returns the result in a json format.
With this we have the endpoint to mock a file upload ready. Next we need to send a request with file object. We cannot send a normal data which would then be treated as a normal request.form. But we want to receive it in request.files. So we create 2 different classes inheriting other classes.

def test_upload_single_file(self):

        class FileObj(StringIO):

            def close(self):
                pass

        class MyRequest(Request):
            def _get_file_stream(*args, **kwargs):
                return FileObj()

        app.request_class = MyRequest


MyRequest
class inherits the Request class of Flask framework. We define the file stream of the Request class as the FileObj. Then, we set the request_class attribute of the Flask app to this new MyRequest class.
After we have it all setup, we need to send the request and see if the uploaded file is being saved properly or not. For this purpose we take help of StringIO library. StringIO creates a file-like class which can be then used to replicate a file uploading system. So we send the data as {‘file’: (StringIO(‘1,2,3,4’), ‘test_file.csv’)}. We send this as data to the /test_upload endpoint that we have created previously. As a result, the endpoint receives the function, saves the file, and returns the filename and file_path for the stored file.

 with app.test_request_context():
            client = app.test_client()
            resp = client.post('/test_upload', data = {'file': (StringIO('1,2,3,4'), 'test_file.csv')})
            data = json.loads(resp.data)
            file_path = data['path']
            filename = data['name']
            actual_file_path = app.config.get('BASE_DIR') + '/static/uploads/' + filename
            self.assertEqual(file_path, actual_file_path)
            self.assertTrue(os.path.exists(file_path))


After this is done, we need to check if the file_path that we receive is the expected file path that we should get. Secondly, we also check whether the file was really created or is this just some dummy data sent. We get the expected path by this:

actual_file_path = app.config.get('BASE_DIR') + '/static/uploads/' + filename.

Then we assert that actual_file_path is same as the resulting path we received using the assertEqual. Thirdly, we use assertTrue to ensure that there is a file in that path. That is,

self.assertTrue(os.path.exists(file_path))

Which gives a True if file exists or False if not.

So that basically sums up the unittesting.
1) If the file is saved in the correct path, and
2) The file actually exist
The the unittest passes only if both is True and is thus successful. Else we get either an error or a failure.

Following is the entire code snippet for this unit testing function:

def test_upload_single_file(self):

        class FileObj(StringIO):

            def close(self):
                pass

        class MyRequest(Request):
            def _get_file_stream(*args, **kwargs):
                return FileObj()

        app.request_class = MyRequest

        @app.route("/test_upload", methods=['POST'])
        def upload():
            files = request.files['file']
            file_uploaded = uploaded_file(files=files)
            return jsonify(
                {'path': file_uploaded.file_path,
                 'name': file_uploaded.filename})

        with app.test_request_context():
            client = app.test_client()
            resp = client.post('/test_upload', data = {'file': (StringIO('1,2,3,4'), 'test_file.csv')})
            data = json.loads(resp.data)
            file_path = data['path']
            filename = data['name']
            actual_file_path = app.config.get('BASE_DIR') + '/static/uploads/' + filename
            self.assertEqual(file_path, actual_file_path)
            self.assertTrue(os.path.exists(file_path))

Resources:
Continue Reading
Close Menu