You are currently viewing Intro to concurrency and Refactoring Open Event Android using RxJava

Intro to concurrency and Refactoring Open Event Android using RxJava

Functional reactive programming seems to have taken the whole development world by storm. It’s one of the hottest thing even after 2 years of constant traction in the communities of several programming languages, where different implementations of the specifications defined by Rx or Reactive Extensions have changed the paradigm of programming for many professional and enthusiast developers.

RxJava is no exception, not only has it been widely adopted by Android and Java developers unanimously, but also received attention of well known and top developers of both communities. The reason of its success is the fluent API with heavy toolset it provides from the Functional Programming paradigm and its ease and natural ability to handle concurrency on different levels based on the type of operation being performed, i.e., computations, I/O, etc. It basically takes away the several constraints and concurrency related checklists developers had to maintain while working with thread management. So, now, developers can’t make an excuse for using database operations on the Main Thread because offloading it on another thread is hard.

So, in this blog post, I will be detailing the process of converting the existing synchronous code of your app into a performant reactive code without breaking the structure of your project, like we did in Open Event Android (Github Repo). Before starting, I have assumed that you know how to add RxJava dependency to your project as it is covered in many other blog posts and the documentation is also very clear. Secondly, you should also add RxAndroid dependency as it contains the Scheduler needed to work on Android’s Main Thread. So, Let’s start.

Current State

Currently, our code loads the queries from database synchronously on Main Thread using the SQLiteDatabase for an Android application. This is how it looks like –

As we can see, we are directly returning the loaded results to the caller. This is called synchronous call, meaning the caller will block till the function is returned, and can’t move further to do anything else. It basically waits for the function to return, which may take hundreds of milliseconds to seconds based on the function it performs.

New Android version crash the applications that perform Network interactions on the main thread but no such restriction for disk based operations is there, making it hard to enforce best performance practices. Before RxJava, there were interfaces made for different kinds of objects, passed in as parameters of the db request function, which created a new thread and performed operations and when completed, returned back the results to the main thread using the postOnUiThread method, so that the views could update themselves. The interface implementations passed are called callbacks because they call a particular function that you provide back when the asynchronous operation is completed. Even the calling of callback function is delegated on the implementor and may result in undesired effects. The query done in this fashion is called an asynchronous query because the execution of this takes place in parallel with main thread and is not synchronised with the main thread. It may take up forever to complete, complete even before the main thread moved on to next operation or even return when the main thread was completed and done waiting for it and destroyed. This will result in a weird crash even when the application was closed, because the returned function will try to update the views which are not even there.

Problems like these made Android Devs lazy and compromise with the performance of their application. Not anymore! RxJava is here to solve half of our problems. You see, RxJava does provide a solution to achieve effortless concurrency but does not ensure thread safety, memory contention, race conditions, deadlocks and other concurrency related issues for you. These you must code up for yourself.

So, after the introduction of Rx and its dire need in Android projects, we will move on to a basic procedure to convert any synchronous code to asynchronous call using RxJava Observable.

Let’s subscribe

The Observable class in RxJava is the most used and standard stream class you will use. Observable handles a stream of object and passes them as they arrive to the Subscriber attached to it. As you may guess, for a stream of data that arrives in a non deterministic fashion ( we don’t know when it will arrive ), we require an asynchronous query, and this is where RxJava excels at. You can configure an Observable to wait for result in one thread so that main thread doesn’t block and deliver result on another thread. You can either create a new thread or use certain pre configured schedulers for basic type of operations :

  1. Schedulers.newThread() : Creates a new thread for each request
  2. Schedulers.io() : For I/O bound work like a network call, database access
  3. Schedulers.computation() : For heavy computations
  4. AndroidSchedulers.mainThread() : For returning to UI thread of Android ( Present in RxAndroid )

There are other types of Schedulers like Schedulers.trampoline(), etc that are used for other purposes like testing, but the above ones are most commonly used ones and we’ll be using Schedulers.computation() for loading the SQLite query on the thread from Computation Thread Pool and AndroidSchedulers.mainThread() for delivering the result on UI thread.

Using Computation instead of I/O because I/O uses unbounded executor, meaning it continues adding threads to the thread pool, which isn’t good. So, we use computation instead. You can create your own bounded executor and pass it as a scheduler

The basic operation of passing an object to a subscriber is :

Observable.just(getEventDetails())
        .subscribe(new Consumer<Event>() {
          @Override
          public void accept(@NonNull Event event) throws Exception {
              Log.d("EVENT", event.toString());
          }
      });

 

Using lambda notation, we get a terse form of the same :

Observable.just(getEventDetails())
 	.subscribe(event -> Log.d("EVENT", event.toString()));

We’ll be using lambda notations from now on.

In the above example, we are just loading and passing the Event object to the subscriber below who logs it. But this is not asynchronous, everything gets executed on main thread. The above code is equivalent to :

Event event = getEventDetails();
Log.d("EVENT", event.toString());

 

So why use it, you say? Well, we can still get a lot of goodies from functional programming this way. For example,

String track = "Android";

Observable.fromIterable(getSessionList())
    .filter(session -> session.getTrack().getName().equals(track))
    .map(Session::getTitle)
    .toList()
    .subscribe(titles -> Log.d("Titles", titles.toString()));

 

What this code does is, take a list of sessions and emit each session at a time, filter out the ones which don’t have Android as their track name, take out their titles and puts them in a list and gives it to subscriber.

Now imagine doing it in plain Java. Create a list of string, loop through each session, check track, push title to that list and this much when this example is the most basic of use cases of RxJava.

But how to achieve concurrency. If the there are 10000 sessions, this code will take huge time even if sessions are in memory and not loaded from database. So we will listen to these list events on computation thread.

Observable.fromIterable(getSessionList())
    .filter(session -> session.getTrack().getName().equals(track))
    .map(Session::getTitle)
    .subscribeOn(Schedulers.computation())
    .subscribe(titles -> adapter.setItems(titles));

 

That’s it. Now each filtering and mapping and converging to a list is done on another thread.

If you want to listen to each session one at a time, and not all at once when it is completed, you can remove toList() operator

But now, our app will crash! Because when we deliver the result to subscriber, we are still on computation thread, so we need to come back to Main Thread because Android Views are not thread safe, meaning they cannot be accessed from any thread other than UI thread. So in order to do that, we just use observeOn() operator :

Observable.fromIterable(getSessionList())
    .filter(session -> session.getTrack().getName().equals(track))
    .map(Session::getTitle)
    .toList()
    .subscribeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(titles -> adapter.setItems(titles));

 

Still, our code has a critical problem, the mapping and filtering takes place on background thread, but the loading of session list still takes place on UI thread because it is loaded first and then passed to Observable

Observable methods like just, from, fromIterable, etc all take object from the current thread, meaning passing the object to these functions will not occur on the Scheduler you have supplied. This is very basic programming concept that language parses rightmost parameter first but usually is misunderstood in terms of Rx programming.

So, what do we do? We use fromCallable which waits till the containing function returns and then operates on it

Observable.fromCallable(this::getSessionList)
    .flatMapIterable(sessions -> sessions)
    .filter(session -> session.getTrack().getName().equals(track))
    .map(Session::getTitle)
    .toList()
    .subscribeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(titles -> adapter.setItems(titles));

 

We’re done! We have changed our synchronous database call to an asynchronous call.

Another use case is when you just have to do an operation asynchronously and not return anything, then fromCallable won’t work as it expects some return value to operate on, instead use Completable

Completable.fromAction(this::clearDatabase)
    .subscribeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(() -> {
        // Completed
        showToast("Success");
    });

 

Note that here we use method reference to call a function, you can just pass in a lambda or Action implementation to do some in place work like this

Completable.fromAction(() -> {
    doSomeStuff();
    // ...
    doOtherStuff(); 
}).subscribeOn(Schedulers.computation())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(() -> {
      // Completed
      showToast("Success");
  });

 

Now, you can wrap all your slow methods into observable or completable without changing any code structure and your code will look like this :

On parting note, a trick to avoid repeated subscribeOn observeOn :

private <T> ObservableTransformer<T, T> applySchedulers() {
    return upstream -> upstream.subscribeOn(Schedulers.computation())
                               .observeOn(AndroidSchedulers.mainThread());
}

 

Create this function and just call compose on each Observable and call the function inside that, passing the transformer., like it is shown in the picture above

That’s it for now. Have a happy and lag free day!

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.