Posting Scraped Tweets to Loklak server from Loklak Wok Android

Loklak Wok Android is a peer harvester that posts collected  messages to the Loklak Server. The suggestions to search tweets are fetched using suggest API endpoint. Using the suggestion queries, tweets are scraped. The scraped tweets are shown in a RecyclerView and simultaneously they are posted to loklak server using push API endpoint. Let’s see how this is implemented.

Adding Dependencies to the project

This feature heavily uses Retrofit2, Reactive extensions(RxJava2, RxAndroid and Retrofit RxJava adapter) and RetroLambda (for Java lambda support in Android).

In app/build.gradle:

apply plugin: 'com.android.application'
apply plugin: 'me.tatarka.retrolambda'

android {
   ...
   packagingOptions {
       exclude 'META-INF/rxjava.properties'
   }
}

dependencies {
   ...
   compile 'com.google.code.gson:gson:2.8.1'

   compile 'com.squareup.retrofit2:retrofit:2.3.0'
   compile 'com.squareup.retrofit2:converter-gson:2.3.0'
   compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'

   compile 'io.reactivex.rxjava2:rxjava:2.0.5'
   compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
}

 

In build.gradle project level:

dependencies {
   classpath 'com.android.tools.build:gradle:2.3.3'
   classpath 'me.tatarka:gradle-retrolambda:3.2.0'
}

 

Implementation

The suggest and push API endpoint is defined in LoklakApi interface

public interface LoklakApi {

   @GET("/api/suggest.json")
   Observable<SuggestData> getSuggestions(@Query("q") String query, @Query("count") int count);

   @POST("/api/push.json")
   @FormUrlEncoded
   Observable<Push> pushTweetsToLoklak(@Field("data") String data);
}

 

The POJOs (Plain Old Java Objects) for suggestions and posting tweets are obtained using jsonschema2pojo, Gson uses POJOs to convert JSON to Java objects.

The REST client is created by Retrofit2 and is implemented in RestClient class. The Gson converter and RxJava adapter for retrofit is added in the retrofit builder. create method is called to generate the API methods(retrofit implements LoklakApi Interface).

public class RestClient {

   private RestClient() {
   }

   private static void createRestClient() {
       sRetrofit = new Retrofit.Builder()
               .baseUrl(BASE_URL)
               // gson converter
               .addConverterFactory(GsonConverterFactory.create(gson))
               // retrofit adapter for rxjava
               .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
               .build();
   }

   private static Retrofit getRetrofitInstance() {
       if (sRetrofit == null) {
           createRestClient();
       }
       return sRetrofit;
   }

   public static <T> T createApi(Class<T> apiInterface) {
       // create method to generate API methods
       return getRetrofitInstance().create(apiInterface);
   }

}

 

The suggestions are fetched by calling getSuggestions after LoklakApi interface is implemented. getSuggestions returns an Observable of type SuggestData, which contains the suggestions in a List. For scraping tweets only a single query needs to be passed to LiquidCore, so flatmap is used to transform the observabe and then fromIterable operator is used to emit single queries as string to LiquidCore which then scrapes tweets, as implemented in fetchSuggestions

private Observable<String> fetchSuggestions() {
   LoklakApi loklakApi = RestClient.createApi(LoklakApi.class);
   Observable<SuggestData> observable = loklakApi.getSuggestions("", 2);
   return observable.flatMap(suggestData -> {
       List<Query> queryList = suggestData.getQueries();
       List<String> queries = new ArrayList<>();
       for (Query query : queryList) {
           queries.add(query.getQuery());
       }
       return Observable.fromIterable(queries);
   });
}

 

As LiquidCore uses callbacks to create a connection between NodeJS instance and Android, to maintain a flow of observables a custom observable is created using create operator which encapsulates the callbacks inside it. For a detail understanding of how LiquidCore event handling works, please go through the example. The way it is implemented in getScrapedTweets:

private Observable<ScrapedData> getScrapedTweets(final String query) {
   final String LC_TWITTER_URI = "android.resource://org.loklak.android.wok/raw/twitter";
   URI uri = URI.create(LC_TWITTER_URI);

   return Observable.create(emitter -> { // custom observable creation
       EventListener startEventListener = (service, event, payload) -> {
               service.emit(LC_QUERY_EVENT, query);
           service.emit(LC_FETCH_TWEETS_EVENT);
       };

       EventListener getTweetsEventListener = (service, event, payload) -> {
           ScrapedData scrapedData = mGson.fromJson(payload.toString(), ScrapedData.class);
           emitter.onNext(scrapedData); // data emitted using ObservableEmitter
       };

       MicroService.ServiceStartListener serviceStartListener = (service -> {
           service.addEventListener(LC_START_EVENT, startEventListener);
           service.addEventListener(LC_GET_TWEETS_EVENT, getTweetsEventListener);
       });

       MicroService microService = new MicroService(getActivity(), uri, serviceStartListener);
       microService.start();
   });
}

 

Now that we are getting suggestions and using them to get scraped tweets, this needs to be done periodically, so that tweets are pushed continuously to the loklak server. For this interval operator is used. A List is maintained which contains the suggestion queries based on which tweets are to be scraped. Once the scraping is done, the suggestion query is removed from the list when they are displayed in RecyclerView. And if the list is empty, then only a new set of suggestions are fetched.

Observable.interval(4, TimeUnit.SECONDS)
       .flatMap(this::getSuggestionsPeriodically)
       .flatMap(query -> {
           mSuggestionQuerries.add(query); // query added to list
           return getScrapedTweets(query);
       });

 

Method reference is used to maintain the modularity, so the logic of periodically fetching suggestions is implemented in getSuggestionsPeriodically

private Observable<String> getSuggestionsPeriodically(Long time) {
   if (mSuggestionQuerries.isEmpty()) { // checks if list is empty
       mInnerCounter = 0;
       return fetchSuggestions(); // new suggestions
   } else { // wait for a previous request to complete
       mInnerCounter++;
       if (mInnerCounter > 3) { // if some strange error occurs
           mSuggestionQuerries.clear();
       }
       return Observable.never(); // no observable is passed to subsriber, subscriber waits
   }
}

 

Now, it’s time to display the fetched tweets and then push the tweets to loklak server. When periodic fetching of suggestions was implemented we used interval operator and then flatMap to transform observables i.e. chaining network requests.

Till this point the observable we were creating were Cold Observable.Cold observables only emit values when a subscription is made. As we need to display scraped tweets and then push it, i.e. one source of observables and two (multiple) subscribers. By intuition the observable should be subscribed two times, for example:

Observable observable = Observable.interval(4, TimeUnit.SECONDS)
       .flatMap(this::getSuggestionsPeriodically)
       .flatMap(query -> {
           mSuggestionQuerries.add(query);
           return getScrapedTweets(query);
       });

// first time subscription
observable
       .subscribeOn(Schedulers.io())
       .observeOn(AndroidSchedulers.mainThread())
       .subscribe(
	// display in RecyclerVIew
       );


// second time subscription
observable
       .flatMap(// trnasformations to push data to server)
       .subscribeOn(Schedulers.io())
       .observeOn(AndroidSchedulers.mainThread())
       .subscribe(
	// display in number of tweets pushed
       );

 

But the source observable is cold observable i.e. it emits objects when it is subscribed to, due to which there will be two different network calls, one for first subscription and one for second subscription. So, both the subscriptions will have different data, which is not what is desired. The expected result is that there should be a single network call, and the data obtained from that call should be displayed and pushed to loklak server.

For this, hot Observables are used. Hot observables start emitting objects the moment they are created, irrespective of whether they are subscribed or not.

A cold observable can be converted to a hot observable by using publish operator and it starts emitting objects when connect operator is used. This is implemented in displayAndPostScrapedData:

ConnectableObservable<ScrapedData> observable = Observable.interval(4, TimeUnit.SECONDS)
           .flatMap(this::getSuggestionsPeriodically)
           .flatMap(query -> {
               mSuggestionQuerries.add(query);
               return getScrapedTweets(query);
           })
           .retry(2)
           .publish();

   // first time subscription to display scraped data
   Disposable viewDisposable = observable
           .subscribeOn(Schedulers.io())
           .observeOn(AndroidSchedulers.mainThread())
           .subscribe(
                   this::displayScrapedData,
                   this::setNetworkErrorView
           );
   mCompositeDisposable.add(viewDisposable);
  
   // second time subscription for pushing data to loklak
   Disposable pushDisposable = observable
           .flatMap(this::pushScrapedData) // scraped data transformed for pushing
           .subscribeOn(Schedulers.io())
           .observeOn(AndroidSchedulers.mainThread())
           .subscribe(
                   push -> {
                       mHarvestedTweets += push.getRecords();
                       harvestedTweetsCountTextView.setText(String.valueOf(mHarvestedTweets));
                   },
                   throwable -> {}
           );
   mCompositeDisposable.add(pushDisposable);

   Disposable publishDisposable = observable.connect(); // hot observable starts emitting
   mCompositeDisposable.add(publishDisposable);
}

 

The two subscriptions are made before connect operator is invoked because the hot observable emits objects due to successful network calls and network calls can’t be done on MainThread (UI Thread). So, doing the subscription before, channels the network calls to a background thread.

The scraped data is converted to JSON from objects using Gson, the JSON is converted to string and then using push API endpoint it is posted to loklak server. This is implemented in pushScrapedData method, which is used in second subscription by using method referencing.

private Observalbe<Push> pushScrapedData(ScrapedData scrapedData) throws Exception{    
    LoklakApi loklakApi = RestClient.createApi(LoklakApi.class);
    List<Status> statuses = scrapedData.getStatuses();
    String data = mGson.toJson(statuses);
    JSONArray jsonArray = new JSONArray(data);
    JSONObject jsonObject = new JSONObject();
    jsonObject.put("statuses", jsonArray);
    return loklakApi.pushTweetsToLoklak(jsonObject.toString());
}

 

Method reference for displayScrapedData and setNetworkErrorView methods are used to display the scraped data and handle unsuccessful network requests.

Only 80 tweets are preserved in RecyclerView. If number of tweets exceeds 80, then old tweets are removed.

private void displayScrapedData(ScrapedData scrapedData) {
   String query = scrapedData.getQuery();
   List<Status> statuses = scrapedData.getStatuses();
   mSuggestionQuerries.remove(query);
   if (mHarvestedTweetAdapter.getItemCount() > 80) {
       mHarvestedTweetAdapter.clearAdapter(); // old tweets removed
   }
   mHarvestedTweetAdapter.addHarvestedTweets(statuses);
   int count = mHarvestedTweetAdapter.getItemCount() - 1;
   recyclerView.scrollToPosition(count);
}

 

In case of a network error, the visibility of RecyclerView and TextView (which shows number of tweets pushed) is changed to gone and a message is displayed that there is network error.

private void setNetworkErrorView(Throwable throwable) {
   Log.e(LOG_TAG, throwable.toString());
   // recyclerView and TextView showing count of harvested tweets are hidden
   ButterKnife.apply(networkViews, GONE);
   // network error message displayed
   networkErrorTextView.setVisibility(View.VISIBLE);
}

References

Resources

Continue ReadingPosting Scraped Tweets to Loklak server from Loklak Wok Android

Implementing Tweet Search Suggestions in Loklak Wok Android

Loklak Wok Android not only is a peer harvester for Loklak Server but it also provides users to search tweets using Loklak’s API endpoints. To provide a better search tweet search experience to the users, the app provides search suggestions using suggest API endpoint. The blog describes how “Search Suggestions” is implemented.

Third Party Libraries used to Implement Suggestion Feature

  • Retrofit2: Used for sending network request
  • Gson: Used for serialization, JSON to POJOs (Plain old java objects).
  • RxJava and RxAndroid: Used to implement a clean asynchronous workflow.
  • Retrolambda: Provides support for lambdas in Android.

These libraries can be installed by adding the following dependencies in app/build.gradle

android {
   .
   // removes rxjava file repetations
   packagingOptions {
      exclude 'META-INF/rxjava.properties'
   }
}

dependencies {
   // gson and retrofit2
    compile 'com.google.code.gson:gson:2.8.1'
    compile 'com.squareup.retrofit2:retrofit:2.3.0'
    compile 'com.squareup.retrofit2:converter-gson:2.3.0'
    compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'

   // rxjava and rxandroid
    compile 'io.reactivex.rxjava2:rxjava:2.0.5'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    compile 'com.jakewharton.rxbinding2:rxbinding:2.0.0'
}

 

To add retrolambda

// in project's build.gradle
dependencies {
    
    classpath 'me.tatarka:gradle-retrolambda:3.2.0'
}

// in app level build.gradle at the top
apply plugin: 'me.tatarka.retrolambda'

 

Fetching Suggestions

Retrofit2 sends a GET request to search API endpoint, the JSON response returned is serialized to Java Objects using the models defined in models.suggest package. The models can be easily generated using JSONSchema2Pojo. The benefit of using Gson is that, the hard work of parsing JSON is easily handled by it. The static method createRestClient creates the retrofit instance to be used for network calls

private static void createRestClient() {
   sRetrofit = new Retrofit.Builder()
           .baseUrl(BASE_URL) // base url : https://api.loklak.org/api/
           .addConverterFactory(GsonConverterFactory.create(gson))
           .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
           .build();
}

 

The suggest endpoint is defined in LoklakApi interface

public interface LoklakApi {

   @GET("/api/suggest.json")
   Observable<SuggestData> getSuggestions(@Query("q") String query);

   @GET("/api/suggest.json")
   Observable<SuggestData> getSuggestions(@Query("q") String query, @Query("count") int count);

   .
}

 

Now, the suggestions are obtained using fetchSuggestion method. First, it creates the rest client to send network requests using createApi method (which internally calls creteRestClient implemented above). The suggestion query is obtained from the EditText. Then the RxJava Observable is subscribed in a separate thread which is specially meant for doing IO operations and finally the obtained data is observed i.e. views are inflated in the MainUI thread.

private void fetchSuggestion() {
   LoklakApi loklakApi = RestClient.createApi(LoklakApi.class); // rest client created
   String query = tweetSearchEditText.getText().toString(); // suggestion query from EditText
   Observable<SuggestData> suggestionObservable = loklakApi.getSuggestions(query); // observable created
   Disposable disposable = suggestionObservable
           .subscribeOn(Schedulers.io()) // subscribed on IO thread
           .observeOn(AndroidSchedulers.mainThread()) // observed on MainUI thread
           .subscribe(this::onSuccessfulRequest, this::onFailedRequest); // views are manipulated accordingly
   mCompositeDisposable.add(disposable);
}

 

If the network request is successful onSuccessfulRequest method is called which updates the data in the RecyclerView.

private void onSuccessfulRequest(SuggestData suggestData) {
   if (suggestData != null) {
       mSuggestAdapter.setQueries(suggestData.getQueries()); // data updated.
   }
   setAfterRefreshingState();
}

 

If the network request fails then onFailedRequest is called which displays a toast saying “Cannot fetch suggestions, Try Again!”. If requests are sent simultaneously and they fail, the previous message i.e. the previous toast is removed.

private void onFailedRequest(Throwable throwable) {
   Log.e(LOG_TAG, throwable.toString());
   if (mToast != null) { // checks if a previous toast is present
       mToast.cancel(); // removes the previous toast.
   }
   setAfterRefreshingState();
   // value of networkRequestError: "Cannot fetch suggestions, Try Again!"
   mToast = Toast.makeText(getActivity(), networkRequestError, Toast.LENGTH_SHORT); // toast is crated
   mToast.show(); // toast is displayed
}

 

Lively Updating suggestions

One way to update suggestions as the user types in, is to send a GET request with a query parameter to suggest API endpoint and check if a previous request is incomplete cancel it. This includes a lot of IO work and seems unnecessary because we would be sending request even if the user hasn’t completed typing what he/she wants to search. One probable solution is to use a Timer.

private TextWatcher searchTextWatcher = new TextWatcher() {  
    @Override
    public void afterTextChanged(Editable arg0) {
        // user typed: start the timer
        timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                String query = editText.getText()
                // send network request to get suggestions
            }
        }, 600); // 600ms delay before the timer executes the „run" method from TimerTask
    }

    @Override
    public void beforeTextChanged(CharSequence s, int start, int count, int after) {}

    @Override
    public void onTextChanged(CharSequence s, int start, int before, int count) {
        // user is typing: reset already started timer (if existing)
        if (timer != null) {
            timer.cancel();
        }
    }
};

 

This one looks good and eventually gets the job done. But, don’t you think for a simple stuff like this we are writing too much of code that is scary at the first sight.

Let’s try a second approach by using RxBinding for Android views and RxJava operators. RxBinding simply converts every event on the view to an Observable which can be subscribed and worked upon.

In place of TextWatcher here we use RxTextView.textChanges method that takes an EditText as a parameter. textChanges creates Observable of character sequences for text changes on view, here EditText. Then debounce operator of RxJava is used which drops observables till the timeout expires, a clean alternative for Timer. The second approach is implemented in updateSuggestions.

private void updateSuggestions() {
   Disposable disposable = RxTextView.textChanges(tweetSearchEditText) // generating observables for text changes
           .debounce(400, TimeUnit.MILLISECONDS) // droping observables i.e. text changes within 4 ms
           .subscribe(charSequence -> {
               if (charSequence.length() > 0) {
                   fetchSuggestion(); // suggestions obtained
               }
           });
   mCompositeDisposable.add(disposable);
}

 

CompositeDisposable is a bucket that contains Disposable objects, which are returned each time an Observable is subscribed to. So, all the disposable objects are collected in CompositeDisposable and unsubscribed when onStop of the fragment is called to avoid memory leaks.

A SwipeRefreshLayout is used, so that user can retry if the request fails or refresh to get new suggestions. When refreshing is not complete a circular ProgressDialog is shown and the RecyclerView showing old suggestions is made invisible, executed by setBeforeRefreshingState method

private void setBeforeRefreshingState() {
   refreshSuggestions.setRefreshing(true);
   tweetSearchSuggestions.setVisibility(View.INVISIBLE);
}

 

Similarly, once refreshing is done ProgessDialog is stopped and the visibility of RecyclerView which now contains the updated suggestions is changed to VISIBLE. This is executed in setAfterRefreshingState method

private void setAfterRefreshingState() {
   refreshSuggestions.setRefreshing(false);
   tweetSearchSuggestions.setVisibility(View.VISIBLE);
}

 

References:

Continue ReadingImplementing Tweet Search Suggestions in Loklak Wok Android

Invalidating user login using JWT in Open Event Orga App

User authentication is an essential part of Open Event Orga App (Github Repo), which allows an organizer to log in and perform actions on the event he/she organizes. Backend for the application, Open Event Orga Server sends an authentication token on successful login, and all subsequent privileged API requests must include this token. The token is a JWT (Javascript Web Token) which includes certain information about the user, such as identifier and information about from when will the token be valid, when will it expire and a signature to verify if it was tampered.

Parsing the Token

Our job was to parse the token to find two fields:

  • Identifier of user
  • Expiry time of the token

We stored the token in our shared preference file and loaded it from there for any subsequent requests. But, the token expires after 24 hours and we needed our login model to clear it once it has expired and shown the login activity instead.

To do this, we needed to parse the JWT and compare the timestamp stored in the exp field with the current timestamp and determine if the token is expired. The first step in the process was to parse the token, which is essentially a Base 64 encoded JSON string with sections separated by periods. The sections are as follows:

  • Header ( Contains information about algorithm used to encode JWT, etc )
  • Payload ( The data in JWT – exp. Iar, nbf, identity, etc )
  • Signature ( Verification signature of JWT )

We were interested in payload and for getting the JSON string from the token, we could have used Android’s Base64 class to decode the token, but we wanted to unit test all the util functions and that is why we opted for a custom Base64 class for only decoding our token.

So, first we split the token by the period and decoded each part and stored it in a SparseArrayCompat

public static SparseArrayCompat<String> decode(String token) {
   SparseArrayCompat<String> decoded = new SparseArrayCompat<>(2);

   String[] split = token.split("\\.");
   decoded.append(0, getJson(split[0]));
   decoded.append(1, getJson(split[1]));

   return decoded;
}

 

The getJson function is primarily decoding the Base64 string

private static String getJson(String strEncoded) {
   byte[] decodedBytes = Base64Utils.decode(strEncoded);
   return new String(decodedBytes);
}

The decoded information was stored in this way

0={"alg":"HS256","typ":"JWT"},  1={"nbf":1495745400,"iat":1495745400,"exp":1495745800,"identity":344}

Extracting Information

Next, we create a function to get the expiry timestamp from the token. We could use GSON or Jackson for the task, but we did not want to map fields into any object. So we simply used JSONObject class which Android provides. It took 5 ms on average to parse the JSON instead of 150 ms by GSON

public static long getExpiry(String token) throws JSONException {
   SparseArrayCompat<String> decoded = decode(token);

   // We are using JSONObject instead of GSON as it takes about 5 ms instead of 150 ms taken by GSON
   return Long.parseLong(new JSONObject(decoded.get(1)).get("exp").toString());
}

 

Next, we wanted to get the ID of user from token to determine if a new user is logging in or an old one, so that we can clear the database for new user.

public static int getIdentity(String token) throws JSONException {
   SparseArrayCompat<String> decoded = decode(token);

   return Integer.parseInt(new JSONObject(decoded.get(1)).get("identity").toString());
}

Validating the token

After this, we needed to create a function that tells if a stored token is expired or not. With all the right functions in place, it was just a matter of comparing current time with the stored timestamp

public static boolean isExpired(String token) {
   long expiry;

   try {
       expiry = getExpiry(token);
   } catch (JSONException jse) {
       return true;
   }

   return System.currentTimeMillis()/1000 >= expiry;
}

 

Since the token provides timestamp from epoch in terms of seconds, we needed to divide the current time in milliseconds by 1000 and the function returned true if current timestamp was greater than the expiry time of token.

After writing a few unit tests for both functions, we just needed to plug them in our login model at the time of authentication.

At the time of starting of the application, we use this function to check if a user is logged in or not:

public boolean isLoggedIn() {
   String token = utilModel.getToken();

   return token != null && !JWTUtils.isExpired(token);
}

 

So, if there is no token or the token is expired, we do not automatically login the user and show the login screen.

Implementing login

The next task were

  • Sequest the server to login
  • Store the acquired token
  • Delete database if it is a new user

Before implementing the above logic, we needed to implement a function to determine if the person logging in is previous user, or new one. For doing so, we first loaded the saved user from our database, if the query is empty, surely it is a new user logging in. So we return false, and if there is a user in the database, we match its ID with the logged in user’s ID:

public Single<Boolean> isPreviousUser(String token) {
   return databaseRepository.getAllItems(User.class)
       .first(EMPTY)
       .map(user -> !user.equals(EMPTY) && user.getId() == JWTUtils.getIdentity(token));
}

 

We have added a default user EMPTY in the first operator so that RxJava returns it if there are no users in the database and then we simply map the user to a boolean denoting if they are same or different using the EMPTY user and getIdentity method from JWTUtils

Finally, we use all this information to implement our self contained login request:

eventService
   .login(new Login(username, password))
   .flatMapSingle(loginResponse -> {
       String token = loginResponse.getAccessToken();
       utilModel.saveToken(token);

       return isPreviousUser(token);
   })
   .flatMapCompletable(isPrevious -> {
       if (!isPrevious)
           return utilModel.deleteDatabase();

       return Completable.complete();
   });

 

Let’s see what is happening here. A request using username and password is made to the server which returns a login response containing a JWT, which we store for future use. Next, we flatMapSingle to the Single returned by the isPreviousUser method. And we finally clear the database if it is not a previous user.

Creating these self contained models help reduce complexity in presenter or view layer and all data is handled in one layer making presenter layer model agnostic.

To learn more about JWT and some of the Rx operators I mentioned here, please visit these links:

Continue ReadingInvalidating user login using JWT in Open Event Orga App

Smart Data Loading in Open Event Android Orga App

In any API centric native application like the Open Event organizer app (Github Repo), there is a need to access data through network, cache it for later use in a database, and retrieve data selectively from both sources, network and disk. Most of Android Applications use SQLite (including countless wrapper libraries on top of it) or Realm to manage their database, and Retrofit has become a de facto standard for consuming a REST API. But there is no standard way to manage the bridge between these two for smart data loading. Most applications directly make calls to DB or API service to selectively load their data and display it on the UI, but this breaks fluidity and cohesion between these two data sources. After all, both of these sources manage the same kind of data.

Suppose you wanted to load your data from a single source without having to worry from where it is coming, it’d require a smart model or repository which checks which kind of data you want to load, check if it is available in the DB, load it from there if it is. And if it not, call the API service to load the data and also save it in the DB there itself. These smart models are self contained, meaning they handle the loading logic and also handle edge cases and errors and take actions for themselves about storing and retrieving the data. This makes presentation or UI layer free of data aspects of the application and also remove unnecessary handling code unrelated to UI.

From the starting of Open Event Android Orga Application planning, we proposed to create an efficient MVP based design with clear separation of concerns. With use of RxJava, we have created a single source repository pattern, which automatically handles connection, reload and database and network management. This blog post will discuss the implementation of the AbstractObservableBuilder class which manages from which source to load the data intelligently

Feature Set

So, first, let’s discuss what features should our AbstractObservableBuilder class should have:

  • Should take two inputs – disk source and network source
  • Should handle force reload from server
  • Should handle network connection logic
  • Should load from disk observable if data is present
  • Should load from network observable if data is not present is disk observable

This constitutes the most basic data operations done on any API based Android application. Now, let’s talk about the implementation

Implementation

Since our class will be of generic type, we will used variable T to denote it. Firstly, we have 4 declarations globally

private IUtilModel utilModel;
private boolean reload;
private Observable<T> diskObservable;
private Observable<T> networkObservable;

 

  • UtilModel tells us if the device is connected to internet or not
  • reload tells if the request should bypass database and fetch from network source only
  • diskObservable is the database source of the item to be fetched
  • networkObservable is the network source of the same item

Next is a very simple implementation of the builder pattern methods which will be used to set these variable fields

@Inject
public AbstractObservableBuilder(IUtilModel utilModel) {
    this.utilModel = utilModel;
}

AbstractObservableBuilder<T> reload(boolean reload) {
    this.reload = reload;

    return this;
}

AbstractObservableBuilder<T> withDiskObservable(Observable<T> diskObservable) {
    this.diskObservable = diskObservable;

    return this;
}

AbstractObservableBuilder<T> withNetworkObservable(Observable<T> networkObservable) {
    this.networkObservable = networkObservable;

    return this;
}

 

UtilModel is the required dependency, and so is added as a constructor parameter.

All right, all variables are set up, now we need to create the build function to actually create the observable:

@NonNull
public Observable<T> build() {
    if (diskObservable == null || networkObservable == null)
        throw new IllegalStateException("Network or Disk observable not provided");

    return Observable
            .defer(getReloadCallable())
            .switchIfEmpty(getConnectionObservable())
            .compose(applySchedulers());
}

Reloading Logic

First of all, we check if the caller forgot to add disk or network source and throw an exception if it is actually so. Next, we use defer operator to defer the call to getReloadCallable() so that this function is not executed until this observable is subscribed. Some articles over the internet directly use combine operators from Rx to make things easy, but this lazy calling is the most efficient way to do things because no actual call will be made to observables.

Secondly, you can easily test the behaviour in unit tests, by verifying that

  • no call to the network observable was made if the data inside disk observable was present; or
  • call to network observable was made even if there was data in disk observable if the reload request was made

These tests would not have been possible if we did not employ the lazy call technique because the calls to the observables and utilModel would have been made before the subscription to this model happen, in order to create this observable eagerly.

Now, let’s see what getReloadCallable does

@NonNull
private Callable<Observable<T>> getReloadCallable() {
    return () -> {
        if (reload)
            return Observable.empty();
        else
            return diskObservable
                .doOnNext(item -> 
                    Timber.d("Loaded %s From Disk on Thread %s",
                    item.getClass(), Thread.currentThread().getName()));
    };
}

 

This function’s role is to return the disk observable if the request is not a reload call, or else return an empty observable, so that force network request happens to reload the data

So it returns a Callable which encapsulates this logic, and besides that, it also adds a log if loading from disk about the type of item loaded and the thread it was loaded on

Connection and Database Switch Logic

In the next chain of operation, we make a switchIfEmpty call to getConnectionObservable(). Because of the above reloading logic, switchIfEmpty serves 2 purpose here, it changes to API call:

  • if db does not contain data
  • If it is a reload call

The observable we switch to is returned by getConnectionObservable() and its purpose is to check if the device is connected to the internet, and if it is, to forward the network request and if it is not, then return an Error Observable.

@NonNull
private Observable<T> getConnectionObservable() {
    if (utilModel.isConnected())
        return networkObservable
            .doOnNext(item -> Timber.d("Loaded %s From Network on Thread %s",
                item.getClass(), Thread.currentThread().getName()));
    else
        return Observable.error(new Throwable(Constants.NO_NETWORK));
}

We use util model to determine if we are connected to internet and take action accordingly. As you can see, here too, we log about the data being loaded and the thread information.

Threading

Lastly, we want to ensure that all processing happens on correct threads, and for that, we call compose with an Observable Transformer to make all requests happen on I/O scheduler and the data is received on Android’s Main Thread

@NonNull
private <V> ObservableTransformer<V, V> applySchedulers() {
    return observable -> observable
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
}

And that’s all it takes to create a reactive, generic and reusable data handler for disk and network based operations. In the repository pattern we have employed in the Open Event Android Orga Application, all our data switching and handling code is delegated to it, with unit tests and integration tests testing the individual and cross component working in all cases.

If you want to learn more about other implementations, you can read these articles

Continue ReadingSmart Data Loading in Open Event Android Orga App

Addition of Bookmark Icon in Schedule ViewHolder in Open Event Android App

In the Open Event Android app we only had a list of sessions in the schedule page without  the ability to bookmark the session unless we went into the SessionDetailPage or visited the session list via the tracks page or the locations page. This was obviously very inconvenient. There were several iterations of UI design for the same. Taking cues from the Google I/O 17 App I thought that the addition of the Bookmark Icon in the Schedule ViewHolder would be helpful to the user. In this blog post I will be talking about how this feature was implemented.

Layout Implementation for the Bookmark Icon

<ImageButton
    android:id="@+id/slot_bookmark"
    android:layout_width="wrap_content"
    android:layout_height="wrap_content"
    android:layout_gravity="end"
    android:layout_alignParentEnd="true"
    android:layout_alignParentRight="true"
    android:layout_alignParentBottom="true"
    android:tint="@color/black"
    android:background="?attr/selectableItemBackgroundBorderless"
    android:contentDescription="@string/session_bookmark_status"
    android:padding="@dimen/padding_small"
    app:srcCompat="@drawable/ic_bookmark_border_white_24dp" />

The bookmark Icon was modelled as an ImageButton inside the item_schedule.xml file which serves as the layout file for the DayScheduleViewHolder.

Bookmark Icon Functionality in DayScheduleAdapter

The Bookmark Icon had mainly 3 roles :

  1. Add the session to the list of bookmarks (obviously)
  2. Generate the notification giving out the bookmarked session details.
  3. Generate a Snackbar if the icon was un-clicked which allowed the user to restore the bookmarked status of the session.
  4. Update the bookmarks widget.

Now we will be seeing how was all this done. Some of this was already done previously in the SessionListAdapter. We just had to modify some of the code to get our desired result.

       Session not bookmarked                      Session bookmarked                      

First we just set a different icon to highlight the Bookmarked and the un-bookmarked status. This code snippet highlights how this is done.

if(session.isBookmarked()) {
    slot_bookmark.setImageResource(R.drawable.ic_bookmark_white_24dp);
 } else {
 slot_bookmark.setImageResource(R.drawable.ic_bookmark_border_white_24dp);
 }
 slot_bookmark.setColorFilter(storedColor,PorterDuff.Mode.SRC_ATOP);

We check if the session is bookmarked by calling a function isBookmarked() and choose one of the 2 bookmark icons depending upon the bookmark status.

If a session was found out to be bookmarked and the Bookmark Icon was we use the WidgetUpdater.updateWidget() function to remove that particular session from the  Bookmark Widget of the app. During this a  Snackbar is also generated “Bookmark Removed” with an UNDO option which is functional.

realmRepo.setBookmark(sessionId, false).subscribe();
 slot_bookmark.setImageResource(R.drawable.ic_bookmark_border_white_24dp);
 
 if ("MainActivity".equals(context.getClass().getSimpleName())) {
    Snackbar.make(slot_content, R.string.removed_bookmark, Snackbar.LENGTH_LONG)
            .setAction(R.string.undo, view -> {
 
                realmRepo.setBookmark(sessionId, true).subscribe();
                slot_bookmark.setImageResource(R.drawable.ic_bookmark_white_24dp);
 
                WidgetUpdater.updateWidget(context);
            }).show();

else {
    Snackbar.make(slot_content, R.string.removed_bookmark, Snackbar.LENGTH_SHORT).show();
 }

If a session wasn’t bookmarked earlier but the Bookmark Icon was clicked we would firstly need to update the bookmark status within our local Realm Database.

realmRepo.setBookmark(sessionId, true).subscribe();

We would also create a notification to notify the user.

NotificationUtil.createNotification(session, context).subscribe(
        () -> Snackbar.make(slot_content,
                R.string.added_bookmark,
                Snackbar.LENGTH_SHORT)
                .show(),
        throwable -> Snackbar.make(slot_content,
                R.string.error_create_notification,
                Snackbar.LENGTH_LONG).show());

The static class Notification Util is responsible for the generation of notifications. The internal working of that class is not necessary right now. What this snippet of code does is that It creates a Snackbar upon successful notification with the text “Bookmark Added” and if any error occurs a Snackbar with the text “Error Creating Notification” is generated.

slot_bookmark.setImageResource(R.drawable.ic_bookmark_white_24dp);
 slot_bookmark.setColorFilter(storedColor,PorterDuff.Mode.SRC_ATOP);

This snippet of code is responsible for the colors that are assigned to the Bookmark Icons for different tracks and this color is obtained in the following manner.

int storedColor = currentSession.getTrack().getColor()

So now we have successfully added the Bookmark Icon to the ScheduleViewHolder inside the schedule of the app.

Resources

Continue ReadingAddition of Bookmark Icon in Schedule ViewHolder in Open Event Android App

Implementing Barcode Scanning in Open Event Android Orga App using RxJava

One of the principal goals of Open Event Orga App (Github Repo) is to let the event organizer to scan the barcode identifier of an attendee at the time of entry in the event, in order to quickly check in that attendee. Although there are several scanning APIs available throughout the web for Android Projects, we chose Google Vision API as it handles multitude of formats and orientations automatically with little to no configuration, integrates seamlessly with Android, and is provided by Google itself, ensuring great support in future as well. Currently, the use case of our application is:

  • Scan barcode from the camera feed
  • Detect and show barcode information on screen
  • Iterate through the attendee list to match the detected code with unique attendee identifier
  • Successfully return to the caller with scanned attendee’s information so that a particular action can be taken

There are several layers of optimisations done in the Orga Application to make the user interaction fluid and concise. I’ll be sharing a few in this blog post regarding the configuration and handling of bursty data from camera source efficiently. To see the full implementation, you can visit the Github Repository of the project using the link provided above.

Configuration

The configuration of our project was done through Dagger 2 following the dependency injection pattern, which is not the focus of this post, but it is always recommended that you follow separation of concerns in your project and create a separate module for handling independent works like barcode scanner configuration. Normally, people would create factories with scanner, camera source and processors encapsulated. This enables the caller to have control over when things are initialized.

Our configuration provides us two initialised objects, namely, CameraSource and BarcodeDetector

@Provides
BarcodeDetector providesBarCodeDetector(Context context, Detector.Processor<Barcode> processor) {
    BarcodeDetector barcodeDetector = new BarcodeDetector.Builder(context)
        .setBarcodeFormats(Barcode.QR_CODE)
        .build();

    barcodeDetector.setProcessor(processor);

    return barcodeDetector;
}

@Provides
CameraSource providesCameraSource(Context context, BarcodeDetector barcodeDetector) {
    return new CameraSource
        .Builder(context, barcodeDetector)
        .setRequestedPreviewSize(640, 480)
        .setRequestedFps(15.0f)
        .setAutoFocusEnabled(true)
        .build();
}

The fields needed to create the objects are provided as arguments to the provider functions as all the dependencies required to construct these objects. Now focusing on the Detector.Processor requirement of the BarcodeDetector is the classic example on non injectable code. This is because the processor is to be supplied by the activity or any other object which wants to receive the callback with the detected barcode data. This means we could inject it at the time of creation of the Activity or Presenter itself. We could easily overcome by adding a constructor to this dagger module containing the Barcode.Processor at the time of injection, but that would violate our existing 0 configuration based model where we just get the required component from the Application class and inject it. So, we wrapped the the processor into a PublishSubject

@Provides
@Singleton
@Named("barcodeEmitter")
PublishSubject<Notification<Barcode>> providesBarcodeEmitter() {
    return PublishSubject.create();
}

@Provides
@Singleton
Detector.Processor<Barcode> providesProcessor(@Named("barcodeEmitter") PublishSubject<Notification<Barcode>> emitter) {
    return new Detector.Processor<Barcode>() {
        @Override
        public void release() {
            // No action to be taken
        }

        @Override
        public void receiveDetections(Detector.Detections<Barcode> detections) {
            SparseArray<Barcode> barcodeSparseArray = detections.getDetectedItems();
            if (barcodeSparseArray.size() == 0)
                emitter.onNext(Notification.createOnError(new Throwable()));
            else
                emitter.onNext(Notification.createOnNext(barcodeSparseArray.valueAt(0)));
        }
    };
}

This solves 2 of our problems, not only now all these dependencies are injectable with 0 configurations, but also our stream of barcodes is now reactive.

Note that not everyone is in favour of using Singleton, but you can decrease the scope using your own annotation. We prefer not creating life cycle bound objects, those are hard to manage and can cause potential memory leaks, and the creation of an anonymous inner class object every time listener activates is not good for memory too.

Also, note that Singleton classes will cause memory leaks too if you don’t release their reference at the time of destruction of life cycle bound object

Notice how the type of PublishSubject is not just a barcode, but Notification which wraps the bar code. That’s because we want to send both the value and error streams down uninterrupted to the caller. Otherwise, the data stream would have stopped on the emission of first onError call. Here, we detect the barcodeSparseArray size and accordingly send error or first value to the PublishSubject which will be accordingly subscribed by the activity or presenter

Handling Bursty Data

barcodeEmitter.subscribe(barcodeNotification -> {
    if (barcodeNotification.isOnError()) {
        presenter.onBarcodeDetected(null);
    } else {
        presenter.onBarcodeDetected(barcodeNotification.getValue());
    }
});

Here is how we are subscribing the notification emitter and passing the appropriate value to the presenter to handle, null if it is an error and the value if it is the next emission.

Note that you must dispose the disposable returned by the subscribe method on the subject when the Activity is to be destroyed or else it will keep the reference to the anonymous inner class created with the lambda for barcodeNotification and cause a memory leak

Now, let’s see how the presenter handles this data for:

  • Hiding and showing the barcode panel when barcode is on the screen accordingly
  • Showing the data extracted from the barcode scanner

These things can be implemented in a very standard way with a few conditionals, but most developers forget the fact that the data emission rate is enormous when concerning with live feed of data. In the Open Event Orga app, we have reduced it to 15 FPS as it is more than enough to scan barcodes for our use case, but it is still huge. The continuous stream of nulls and barcode data is useless to us unless it changes.

A little explanation about nulls and values here: You must have noticed above the conditions when we pass null and value, but I’ll explain again. A value will be passed if there is a detected barcode on screen, and null will be passed if there is no barcode detected. The Google Vision API will keep sending the same value for barcode at 15 FPS and so we’ll get this redundant stream of nulls and values which we should not concern with processing as this will load the CPU unnecessarily.

There are only 2 cases where we need to process it:

  • Null changes to Value -> Show barcode panel
    Value changes to null -> Hide barcode panel
  • Value changes irrespective of nulls -> Show barcode data on UI and search through the attendee identifiers

So, here too we’ll create 2 PublishSubject objects

private PublishSubject<Boolean> detect = PublishSubject.create();
private PublishSubject<String> data = PublishSubject.create();

And we’ll configure them both in this way to receive data on each barcode emission in the presenter:

public void onBarcodeDetected(Barcode barcode) {
   detect.onNext(barcode == null);
   if (barcode != null)
       data.onNext(barcode.displayValue);
}

This will make data only receive non-null changes and detect receive a boolean notifying if the current detected barcode was null or not.

Now, we see how each of these subjects is configured to pass the emissions downstream:

data.distinctUntilChanged()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(this::processBarcode);

This one is pretty straightforward. It’ll only send data downstream if its value has changed from the previous emission, disregarding nulls. So, for a stream like this

A A A A A null null null null A A A A null B B B B B B null null null B B A A null A

It will only emit:

A                                                                         B                                                  A

Which is actually what we want, as we only need to process and show the distinct barcode data and match it with our attendee list. Now, with thousands of attendees, the first method would have triggered unnecessary and time-consuming computations over and over again on same identifiers with little gaps of time, which would have created mediocre results even in a multi-threaded environment. The second one saves us from repetitive calculations and also gives us enormous gaps between emissions, which is optimal for our use case.

The second case is not so obvious because we can’t ignore nulls here as we have to show and hide UI based on them. This means that unlike our previous stream if we just use distinctUntilChanged, it will look like this:

A A A A A null null null null A A A A null B B B B B B null null null B B A A null A

f                 t                               f               t      f                    t                        f             t       f

This is because, if you remember, we were sending down emissions of barcode == null on each emission for this Subject. So, in this case, as you may see, some of the values are so close enough that it will not be discernable in UI and also annoy users who’ll see the panel pop up for milliseconds before vanishing or vice-versa. The perfect operation for this case will be debounce

detect.distinctUntilChanged()
    .debounce(150, TimeUnit.MILLISECONDS)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(receiving -> scanQRView.showBarcodePanel(!receiving));

This operator will drop any emission in the window of 150ms succession and only pick up those emissions which are 150ms apart from each other. Now, 150 ms is not a magic number, it is picked through hit and trial and what works best for your case. Lower the value and you will pick up more changes downstream, increase the value and you might miss the required events.

This makes our stream somewhat like this, cleaning out the cluttered events

f                 t                               f                                           t                        f

This is the screenshot of the implementation:  

 

And an animated gif of the scanning process:

This is all for this blog, you may use many other operators from the arsenal of RxJava, whatever fits your use case. As I have presumed the knowledge about Subjects, MVP and a little bit of Dagger in this post, I’ll link some of the resources where you can find more information about these:

http://reactivex.io/documentation/subject.html

https://github.com/ReactiveX/RxJava/wiki/Subject

http://reactivex.io/documentation/operators/distinct.html

http://www.vogella.com/tutorials/Dagger/article.html

https://antonioleiva.com/mvp-android/

Continue ReadingImplementing Barcode Scanning in Open Event Android Orga App using RxJava

Testing Asynchronous Code in Open Event Orga App using RxJava

In the last blog post, we saw how to test complex interactions through our apps using stubbed behaviors by Mockito. In this post, I’ll be talking about how to test RxJava components such as Observables. This one will focus on testing complex situations using RxJava as the library itself provides methods to unit test your reactive streams, so that you don’t have to go out of your way to set contraptions like callback captors, and implement your own interfaces as stubs of the original one. The test suite (kind of) provided by RxJava also allows you to test the fate of your stream, like confirming that they got subscribed or an error was thrown; or test an individual emitted item, like its value or with a predicate logic of your own, etc. We have used this heavily in Open Event Orga App (Github Repo) to detect if the app is correctly loading and refreshing resources from the correct source. We also capture certain triggers happening to events like saving of data on reloading so that the database remains in a consistent state. Here, we’ll look at some basic examples and move to some complex ones later. So, let’s start.

public class AttendeeRepositoryTest {

    private AttendeeRepository attendeeRepository;

    @Before
    public void setUp() {
        testDemo = new TestDemo();
    }

    @Test
    public void shouldReturnAttendeeByName() {
        // TODO: Implement test
    }

}

 

This is our basic test class setup with general JUnit settings. We’ll start by writing our tests, the first of which will be to test that we can get an attendee by name. The attendee class is a model class with firstName and lastName. And we will be checking if we get a valid attendee by passing a full name. Note that although we will be talking about the implementation of the code which we are writing tests for, but only in an abstract manner, meaning we won’t be dealing with production code, just the test.

So, as we know that Observables provide a stream of data. But here, we are only concerned with one attendee. Technically, we should be using Single, but for generality, we’ll stick with Observables.

So, a person from the background of JUnit would be tempted to write this code below.

Attendee attendee = attendeeRepository.getByAttendeeName("John Wick")
    .blockingFirst();

assertEquals("John Wick", attendee.getFirstName() + attendee.getLastName());

 

So, what this code is doing is blocking the thread till the first attendee is provided in the stream and then checking that the attendee is actually John Wick.

While this code works, this is not reactive. With the reactive way of testing, not only you can test more complex logic than this with less verbosity, but it naturally provides ways to test other behaviors of Reactive streams such as subscriptions, errors, completions, etc. We’ll only be covering a few. So, let’s see the reactive version of the above test.

attendeeRepository.getByAttendeeName("John Wick")
    .firstElement()
    .test()
    .assertNoErrors()
    .assertValue(attendee -> "John Wick".equals(
        attendee.getFirstName() + attendee.getLastName()
    ));

 

So clean and complete. Just by calling test() on the returned observable, we got this whole suite of testing methods with which, not only did we test the name but also that there are no errors while getting the attendee.

Testing for Network Error on loading of Attendees

OK, so let’s move towards a more realistic test. Suppose that you call this method on AttendeeRepository, and that you can fetch attendees from the network. So first, you want to handle the simplest case, that there should be an error if there is no connection. So, if you have (hopefully) set up your project using abstractions for the model using MVP, then it’ll be a piece of cake to test this. Let’s suppose we have a networkUtil object with a method isConnected.

The NetworkUtil class is a dependency of AttendeeRepository and we have set it up as a mock in our test using Mockito. If this is sounding somewhat unfamiliar, please read my previous article “The Joy of Testing with MVP”.

So, our test will look like this

@Test
public void shouldStreamErrorOnNetworkDown() {
    when(networkUtils.isConnected()).thenReturn(false);
    
    attendeeRepository.getAttendees()
        .test()
        .assertErrorMessage("No Network");
}

 

Note that, if you don’t define the mock object’s behavior like I have here, attendeeRepository will likely throw an NPE as it will be calling isConnected() on an undefined object.

With RxJava, you get a whole lot of methods for each use case. Even for checking errors, you get to assert a particular Throwable, or a predicate defining an operation on the Throwable, or error message as I have shown in this case.

Now, if you run this code, it’ll probably fail. That’s because if you are testing this by offloading the networking task to a different thread by using subscribeOn observeOn methods, the test body may be detached from Main Thread while the requests complete. Furthermore, if testing in an application made for Android, you would have use AndroidSchedulers.mainThread(), but as it is an Android dependency, the test will fail. Well actually, crash. There were some workarounds by creating abstractions for even RxJava schedulers, but RxJava2 provides a very convenient method to override the default schedulers in the form of RxJavaPlugins. Similarly, RxAndroidPlugins is present in the rx-android package. Let’s suppose you have the plan to use Schedulers.io() for asynchronous work and want to get the stream on Android’s Main Thread, meaning you use AndroidSchedulers.mainThread() in the observeOn method. To override these schedulers to Schedulers.trampoline() which queues your tasks and performs them one by one, like the main thread, your setUp will include this:

RxJavaPlugins.setIoSchedulerHandler(scheduler ->  Schedulers.trampoline());
RxAndroidPlugins.setInitMainThreadSchedulerHandler(scheduler -> Schedulers.trampoline());

 

And if you are not using isolated tests and need to resume the default scheduler behavior after each test, then you’ll need to add this in your tearDown method

RxJavaPlugins.reset();
RxAndroidPlugins.reset();

Testing for Correct loading of Attendees

Now that we have tested that our Repository is correctly throwing an error when the network is down, let’s test that it correctly loads attendees when the network is connected. For this, we’ll need to mock our EventService to return attendees when queried, since we don’t want our unit tests to actually hit the servers.

So, we’ll need to keep these things in mind:

  • Mock the network until it shows that it is connected to the Internet
  • Mock the EventService to return attendees when queried
  • Call the getter on the attendeeRepository and test that it indeed returned a list of attendees

For these conditions, our test will look like this:

@Test
public void shouldLoadAttendeesSuccessfully() {
    List<Attendee> attendees = Arrays.asList(
        new Attendee(),
        new Attendee(),
        new Attendee()
    );

    when(networkUtils.isConnected()).thenReturn(true);
    when(eventService.getAttendees()).thenReturn(Observable.just(attendees));

    attendeeRepository.getAttendees()
        .test()
        .assertValues(attendees.toArray(new Attendee[attendees.size()]));
}

 

The assertValues function asserts that these values were emitted by the observable. And if you want to be terser, you can even verify that in fact EventService’s getAttendees function was called by

verify(eventService).getAttendees();

 

But the problem in this way is that the getAttendees function returns an observable and just calling it does not necessarily means that it was subscribed, emitting the results, hence we need to test to ensure that it was indeed subscribed. If we call the normal test() function on the observable, it is already subscribed, making the result of testSubscribed always true. In order to test that correctly, let’s look at our final use case.

Testing for saving of Attendees

In the Open Event Orga App, we have strived to create self-sufficient and intelligent classes, thus, our repository is also built this way. It detects that new attendees are loaded from the server and saves them in the database. Now we’d want to test this functionality.

In this test, there is an added dependency of DatabaseRepository for saving the attendees, which we will mock. The conditions for this test will be:

  • Network is connected
  • EventService returns attendees
  • DatabaseRepository mocks the saving of attendees

For DatabaseRepository’s save method, we’ll be returning a Completable, which will notify when the saving of data is completed. The primary purpose of this test will be to assert that this completable is indeed subscribed when the attendee loading is triggered. This will not only ensure that the correct function to save the attendees is called, but also that it is indeed triggered and not just left hanging after the call. So, our test will look like this.

@Test
public void shouldSaveAttendeesInDatabase() {
    List<Attendee> attendees = Arrays.asList(
        new Attendee(),
        new Attendee(),
        new Attendee()
    );

    TestObserver testObserver = TestObserver.create();
    Completable completable = Completable.complete()
        .doOnSubscribe(testObserver::onSubscribe);

    when(networkUtils.isConnected()).thenReturn(true);
    when(databaseRepository.save(attendees)).thenReturn(completable);
    when(eventService.getAttendees()).thenReturn(Observable.just(attendees));

    attendeeRepository.getAttendees()
        .test()
        .assertNoErrors();

    testObserver.assertSubscribed();
}

 

Here, we have created a separate test observable and set it to be subscribed when the Completable is subscribed and we have returned that Completable when the save method is called. In the last, we have asserted that the test observer is indeed subscribed.

You can create more complex use cases and assert subscriptions, errors, the emptiness of a stream and much more, by using the built-in test functionalities of RxJava2. So, that’s all for this blog, you can visit these links for more details on unit testing RxJava

http://fedepaol.github.io/blog/2015/09/13/testing-rxjava-observables-subscriptions/

https://www.infoq.com/articles/Testing-RxJava

Continue ReadingTesting Asynchronous Code in Open Event Orga App using RxJava

Making Open Event Organizer Android App Reactive

FOSSASIA’s Open Event Organizer is an Android Application for Event Management. The core feature it provides is two way attendee check in, directly by searching name in the list of attendees or just by scanning a QR code from ticket. So as attendees of an event can be really large in number like 1000+ or more than that, it should not alter the performance of the App. Just imagine a big event with lot of attendees (lets say 1000+) and if check in feature of the app is slow what will be the mess at entrance where each attendee is waiting for his ticket to be scanned and verified. For example, a check in via QR code scan. A complete process is somewhat like this:

  1. QR scanner scans the ticket code and parse it into the ticket identifier string.
  2. Identifier string is parsed to get an attendee id from it.
  3. Using the attendee id, attendee is searched in the complete attendees list of the event.
  4. On match, attendee’s check in status is toggled by making required call to the server.
  5. On successful toggling the attendee is updated in database accordingly.
  6. And check in success message is shown on the screen.

From the above tasks 1st and 6th steps only are UI related. Remaining all are just background tasks which can be run on non-UI thread instead of carrying them on the same UI thread which is mainly responsible for all the user interaction with the app. ReactiveX, an API for asynchronous programming enables us to do this. Just for clarification asynchronous programming is not multithreading. It just means the tasks are independent and hence can be executed at same time. This will be another big topic to talk about. Here we have used ReactiveX just for running these tasks in background at the same time UI thread is running. Here is our code of barcode processing:

private void processBarcode(String barcode) {
  Observable.fromIterable(attendees)
      .filter(attendee -> attendee.getOrder() != null)
      .filter(attendee -> (attendee.getOrder().getIdentifier() + "-" + attendee.getId()).equals(barcode))
      .subscribeOn(Schedulers.computation())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(attendee -> {
          // here we get the attendee and
          // further processing can be called here
          scanQRView.onScannedAttendee(attendee);
      });
  }

In the above code you will see the actual creation of an Observable. Observable class has a method fromIterable which takes list of items and create an Observable which emits these items. So hence we need to search the attendee in the attendees list we have already stored in database. Filter operator filters the items emitted using the function provided. Last two lines are important here which actually sets how our subscriber is going to work. You will need to apply this thread management setting to your observable while working on android. You don’t actually have to worry about it. Just remember subscribeOn sets the thread on which actually the background tasks will run and on item emission subscriber handle it on main thread which is set by observeOn method. Subscribe operator provides the function what actually we need to run on main thread after emitted item is caught. Once we find the attendee from barcode, network call is made to toggle check in status of the attendee. Here is the code of check in method:

public void toggleCheckIn() { eventRepository.toggleAttendeeCheckStatus(attendee.getEventId(), attendeeId)
      .subscribe(completed -> {
          ...
          String status = attendee.isCheckedIn() ? "Checked In" : "Checked Out";
          attendeeCheckInView.onSuccess(status);
      }, throwable -> {
          throwable.printStackTrace();
          ...
      });
}

In the above code toggleAttendeeCheckStatus method returns an Observable. As name suggests the Observable is to be observed and it emits signals (objects) which are caught by a Subscriber. So here observer is Subscriber. So in the above code toggleAttendeeCheckStatus is creating an Obseravable. Lets look into toggleAttendeeCheckStatus code:

public Observable<Attendee> toggleAttendeeCheckStatus(long eventId, long attendeeId) {
  return eventService.toggleAttendeeCheckStatus(eventId, attendeeId, getAuthorization())
      .map(attendee -> {
          ...
          // updating database logic
          ...
          return attendee;
      })
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread());
}

We have used Retrofit+Okhttp for network calls. In the above code eventService.toggleAttendeeCheckStatus returns an Observable which emits updated Attendee object on server response. Here we have used Map operator provided by ReactiveX which applies function defined inside it on each item emitted by the observable and returns a new observable with these items. So here we have use it to make the related updates in the database. Now with ReactiveX support the complete check in process is:

(Tasks running in background in bold style)

  1. QR scanner scans the ticket code and parse it into the ticket identifier string.
  2. Using the identifier, attendee is searched in the complete attendees list of the event.
  3. On match, attendee’s check in status is toggled by making required call to the server.
  4. On successful toggling the attendee is updated in database accordingly.
  5. And check in success message is shown on the screen.

So now main thread runs only tasks related to the UI. And time consuming tasks are run in background and UI is updated on their completion accordingly. Hence check in process becomes smooth irrespective of size of the attendees. And app never crashes.

Continue ReadingMaking Open Event Organizer Android App Reactive

Intro to concurrency and Refactoring Open Event Android using RxJava

Functional reactive programming seems to have taken the whole development world by storm. It’s one of the hottest thing even after 2 years of constant traction in the communities of several programming languages, where different implementations of the specifications defined by Rx or Reactive Extensions have changed the paradigm of programming for many professional and enthusiast developers.

RxJava is no exception, not only has it been widely adopted by Android and Java developers unanimously, but also received attention of well known and top developers of both communities. The reason of its success is the fluent API with heavy toolset it provides from the Functional Programming paradigm and its ease and natural ability to handle concurrency on different levels based on the type of operation being performed, i.e., computations, I/O, etc. It basically takes away the several constraints and concurrency related checklists developers had to maintain while working with thread management. So, now, developers can’t make an excuse for using database operations on the Main Thread because offloading it on another thread is hard.

So, in this blog post, I will be detailing the process of converting the existing synchronous code of your app into a performant reactive code without breaking the structure of your project, like we did in Open Event Android (Github Repo). Before starting, I have assumed that you know how to add RxJava dependency to your project as it is covered in many other blog posts and the documentation is also very clear. Secondly, you should also add RxAndroid dependency as it contains the Scheduler needed to work on Android’s Main Thread. So, Let’s start.

Current State

Currently, our code loads the queries from database synchronously on Main Thread using the SQLiteDatabase for an Android application. This is how it looks like –

As we can see, we are directly returning the loaded results to the caller. This is called synchronous call, meaning the caller will block till the function is returned, and can’t move further to do anything else. It basically waits for the function to return, which may take hundreds of milliseconds to seconds based on the function it performs.

New Android version crash the applications that perform Network interactions on the main thread but no such restriction for disk based operations is there, making it hard to enforce best performance practices. Before RxJava, there were interfaces made for different kinds of objects, passed in as parameters of the db request function, which created a new thread and performed operations and when completed, returned back the results to the main thread using the postOnUiThread method, so that the views could update themselves. The interface implementations passed are called callbacks because they call a particular function that you provide back when the asynchronous operation is completed. Even the calling of callback function is delegated on the implementor and may result in undesired effects. The query done in this fashion is called an asynchronous query because the execution of this takes place in parallel with main thread and is not synchronised with the main thread. It may take up forever to complete, complete even before the main thread moved on to next operation or even return when the main thread was completed and done waiting for it and destroyed. This will result in a weird crash even when the application was closed, because the returned function will try to update the views which are not even there.

Problems like these made Android Devs lazy and compromise with the performance of their application. Not anymore! RxJava is here to solve half of our problems. You see, RxJava does provide a solution to achieve effortless concurrency but does not ensure thread safety, memory contention, race conditions, deadlocks and other concurrency related issues for you. These you must code up for yourself.

So, after the introduction of Rx and its dire need in Android projects, we will move on to a basic procedure to convert any synchronous code to asynchronous call using RxJava Observable.

Let’s subscribe

The Observable class in RxJava is the most used and standard stream class you will use. Observable handles a stream of object and passes them as they arrive to the Subscriber attached to it. As you may guess, for a stream of data that arrives in a non deterministic fashion ( we don’t know when it will arrive ), we require an asynchronous query, and this is where RxJava excels at. You can configure an Observable to wait for result in one thread so that main thread doesn’t block and deliver result on another thread. You can either create a new thread or use certain pre configured schedulers for basic type of operations :

  1. Schedulers.newThread() : Creates a new thread for each request
  2. Schedulers.io() : For I/O bound work like a network call, database access
  3. Schedulers.computation() : For heavy computations
  4. AndroidSchedulers.mainThread() : For returning to UI thread of Android ( Present in RxAndroid )

There are other types of Schedulers like Schedulers.trampoline(), etc that are used for other purposes like testing, but the above ones are most commonly used ones and we’ll be using Schedulers.computation() for loading the SQLite query on the thread from Computation Thread Pool and AndroidSchedulers.mainThread() for delivering the result on UI thread.

Using Computation instead of I/O because I/O uses unbounded executor, meaning it continues adding threads to the thread pool, which isn’t good. So, we use computation instead. You can create your own bounded executor and pass it as a scheduler

The basic operation of passing an object to a subscriber is :

Observable.just(getEventDetails())
        .subscribe(new Consumer<Event>() {
          @Override
          public void accept(@NonNull Event event) throws Exception {
              Log.d("EVENT", event.toString());
          }
      });

 

Using lambda notation, we get a terse form of the same :

Observable.just(getEventDetails())
 	.subscribe(event -> Log.d("EVENT", event.toString()));

We’ll be using lambda notations from now on.

In the above example, we are just loading and passing the Event object to the subscriber below who logs it. But this is not asynchronous, everything gets executed on main thread. The above code is equivalent to :

Event event = getEventDetails();
Log.d("EVENT", event.toString());

 

So why use it, you say? Well, we can still get a lot of goodies from functional programming this way. For example,

String track = "Android";

Observable.fromIterable(getSessionList())
    .filter(session -> session.getTrack().getName().equals(track))
    .map(Session::getTitle)
    .toList()
    .subscribe(titles -> Log.d("Titles", titles.toString()));

 

What this code does is, take a list of sessions and emit each session at a time, filter out the ones which don’t have Android as their track name, take out their titles and puts them in a list and gives it to subscriber.

Now imagine doing it in plain Java. Create a list of string, loop through each session, check track, push title to that list and this much when this example is the most basic of use cases of RxJava.

But how to achieve concurrency. If the there are 10000 sessions, this code will take huge time even if sessions are in memory and not loaded from database. So we will listen to these list events on computation thread.

Observable.fromIterable(getSessionList())
    .filter(session -> session.getTrack().getName().equals(track))
    .map(Session::getTitle)
    .subscribeOn(Schedulers.computation())
    .subscribe(titles -> adapter.setItems(titles));

 

That’s it. Now each filtering and mapping and converging to a list is done on another thread.

If you want to listen to each session one at a time, and not all at once when it is completed, you can remove toList() operator

But now, our app will crash! Because when we deliver the result to subscriber, we are still on computation thread, so we need to come back to Main Thread because Android Views are not thread safe, meaning they cannot be accessed from any thread other than UI thread. So in order to do that, we just use observeOn() operator :

Observable.fromIterable(getSessionList())
    .filter(session -> session.getTrack().getName().equals(track))
    .map(Session::getTitle)
    .toList()
    .subscribeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(titles -> adapter.setItems(titles));

 

Still, our code has a critical problem, the mapping and filtering takes place on background thread, but the loading of session list still takes place on UI thread because it is loaded first and then passed to Observable

Observable methods like just, from, fromIterable, etc all take object from the current thread, meaning passing the object to these functions will not occur on the Scheduler you have supplied. This is very basic programming concept that language parses rightmost parameter first but usually is misunderstood in terms of Rx programming.

So, what do we do? We use fromCallable which waits till the containing function returns and then operates on it

Observable.fromCallable(this::getSessionList)
    .flatMapIterable(sessions -> sessions)
    .filter(session -> session.getTrack().getName().equals(track))
    .map(Session::getTitle)
    .toList()
    .subscribeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(titles -> adapter.setItems(titles));

 

We’re done! We have changed our synchronous database call to an asynchronous call.

Another use case is when you just have to do an operation asynchronously and not return anything, then fromCallable won’t work as it expects some return value to operate on, instead use Completable

Completable.fromAction(this::clearDatabase)
    .subscribeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(() -> {
        // Completed
        showToast("Success");
    });

 

Note that here we use method reference to call a function, you can just pass in a lambda or Action implementation to do some in place work like this

Completable.fromAction(() -> {
    doSomeStuff();
    // ...
    doOtherStuff(); 
}).subscribeOn(Schedulers.computation())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(() -> {
      // Completed
      showToast("Success");
  });

 

Now, you can wrap all your slow methods into observable or completable without changing any code structure and your code will look like this :

On parting note, a trick to avoid repeated subscribeOn observeOn :

private <T> ObservableTransformer<T, T> applySchedulers() {
    return upstream -> upstream.subscribeOn(Schedulers.computation())
                               .observeOn(AndroidSchedulers.mainThread());
}

 

Create this function and just call compose on each Observable and call the function inside that, passing the transformer., like it is shown in the picture above

That’s it for now. Have a happy and lag free day!

Continue ReadingIntro to concurrency and Refactoring Open Event Android using RxJava