Posting Scraped Tweets to Loklak server from Loklak Wok Android
Loklak Wok Android is a peer harvester that posts collected messages to the Loklak Server. The suggestions to search tweets are fetched using suggest API endpoint. Using the suggestion queries, tweets are scraped. The scraped tweets are shown in a RecyclerView and simultaneously they are posted to loklak server using push API endpoint. Let’s see how this is implemented.
Adding Dependencies to the project
This feature heavily uses Retrofit2, Reactive extensions(RxJava2, RxAndroid and Retrofit RxJava adapter) and RetroLambda (for Java lambda support in Android).
In app/build.gradle:
apply plugin: 'com.android.application' apply plugin: 'me.tatarka.retrolambda' android { ... packagingOptions { exclude 'META-INF/rxjava.properties' } } dependencies { ... compile 'com.google.code.gson:gson:2.8.1' compile 'com.squareup.retrofit2:retrofit:2.3.0' compile 'com.squareup.retrofit2:converter-gson:2.3.0' compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0' compile 'io.reactivex.rxjava2:rxjava:2.0.5' compile 'io.reactivex.rxjava2:rxandroid:2.0.1' }
In build.gradle project level:
dependencies { classpath 'com.android.tools.build:gradle:2.3.3' classpath 'me.tatarka:gradle-retrolambda:3.2.0' }
Implementation
The suggest and push API endpoint is defined in LoklakApi interface
public interface LoklakApi { @GET("/api/suggest.json") Observable<SuggestData> getSuggestions(@Query("q") String query, @Query("count") int count); @POST("/api/push.json") @FormUrlEncoded Observable<Push> pushTweetsToLoklak(@Field("data") String data); }
The POJOs (Plain Old Java Objects) for suggestions and posting tweets are obtained using jsonschema2pojo, Gson uses POJOs to convert JSON to Java objects.
The REST client is created by Retrofit2 and is implemented in RestClient class. The Gson converter and RxJava adapter for retrofit is added in the retrofit builder. create method is called to generate the API methods(retrofit implements LoklakApi Interface).
public class RestClient { private RestClient() { } private static void createRestClient() { sRetrofit = new Retrofit.Builder() .baseUrl(BASE_URL) // gson converter .addConverterFactory(GsonConverterFactory.create(gson)) // retrofit adapter for rxjava .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) .build(); } private static Retrofit getRetrofitInstance() { if (sRetrofit == null) { createRestClient(); } return sRetrofit; } public static <T> T createApi(Class<T> apiInterface) { // create method to generate API methods return getRetrofitInstance().create(apiInterface); } }
The suggestions are fetched by calling getSuggestions after LoklakApi interface is implemented. getSuggestions returns an Observable of type SuggestData, which contains the suggestions in a List. For scraping tweets only a single query needs to be passed to LiquidCore, so flatmap is used to transform the observabe and then fromIterable operator is used to emit single queries as string to LiquidCore which then scrapes tweets, as implemented in fetchSuggestions
private Observable<String> fetchSuggestions() { LoklakApi loklakApi = RestClient.createApi(LoklakApi.class); Observable<SuggestData> observable = loklakApi.getSuggestions("", 2); return observable.flatMap(suggestData -> { List<Query> queryList = suggestData.getQueries(); List<String> queries = new ArrayList<>(); for (Query query : queryList) { queries.add(query.getQuery()); } return Observable.fromIterable(queries); }); }
As LiquidCore uses callbacks to create a connection between NodeJS instance and Android, to maintain a flow of observables a custom observable is created using create operator which encapsulates the callbacks inside it. For a detail understanding of how LiquidCore event handling works, please go through the example. The way it is implemented in getScrapedTweets:
private Observable<ScrapedData> getScrapedTweets(final String query) { final String LC_TWITTER_URI = "android.resource://org.loklak.android.wok/raw/twitter"; URI uri = URI.create(LC_TWITTER_URI); return Observable.create(emitter -> { // custom observable creation EventListener startEventListener = (service, event, payload) -> { service.emit(LC_QUERY_EVENT, query); service.emit(LC_FETCH_TWEETS_EVENT); }; EventListener getTweetsEventListener = (service, event, payload) -> { ScrapedData scrapedData = mGson.fromJson(payload.toString(), ScrapedData.class); emitter.onNext(scrapedData); // data emitted using ObservableEmitter }; MicroService.ServiceStartListener serviceStartListener = (service -> { service.addEventListener(LC_START_EVENT, startEventListener); service.addEventListener(LC_GET_TWEETS_EVENT, getTweetsEventListener); }); MicroService microService = new MicroService(getActivity(), uri, serviceStartListener); microService.start(); }); }
Now that we are getting suggestions and using them to get scraped tweets, this needs to be done periodically, so that tweets are pushed continuously to the loklak server. For this interval operator is used. A List is maintained which contains the suggestion queries based on which tweets are to be scraped. Once the scraping is done, the suggestion query is removed from the list when they are displayed in RecyclerView. And if the list is empty, then only a new set of suggestions are fetched.
Observable.interval(4, TimeUnit.SECONDS) .flatMap(this::getSuggestionsPeriodically) .flatMap(query -> { mSuggestionQuerries.add(query); // query added to list return getScrapedTweets(query); });
Method reference is used to maintain the modularity, so the logic of periodically fetching suggestions is implemented in getSuggestionsPeriodically
private Observable<String> getSuggestionsPeriodically(Long time) { if (mSuggestionQuerries.isEmpty()) { // checks if list is empty mInnerCounter = 0; return fetchSuggestions(); // new suggestions } else { // wait for a previous request to complete mInnerCounter++; if (mInnerCounter > 3) { // if some strange error occurs mSuggestionQuerries.clear(); } return Observable.never(); // no observable is passed to subsriber, subscriber waits } }
Now, it’s time to display the fetched tweets and then push the tweets to loklak server. When periodic fetching of suggestions was implemented we used interval operator and then flatMap to transform observables i.e. chaining network requests.
Till this point the observable we were creating were Cold Observable.Cold observables only emit values when a subscription is made. As we need to display scraped tweets and then push it, i.e. one source of observables and two (multiple) subscribers. By intuition the observable should be subscribed two times, for example:
Observable observable = Observable.interval(4, TimeUnit.SECONDS) .flatMap(this::getSuggestionsPeriodically) .flatMap(query -> { mSuggestionQuerries.add(query); return getScrapedTweets(query); }); // first time subscription observable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( // display in RecyclerVIew ); // second time subscription observable .flatMap(// trnasformations to push data to server) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( // display in number of tweets pushed );
But the source observable is cold observable i.e. it emits objects when it is subscribed to, due to which there will be two different network calls, one for first subscription and one for second subscription. So, both the subscriptions will have different data, which is not what is desired. The expected result is that there should be a single network call, and the data obtained from that call should be displayed and pushed to loklak server.
For this, hot Observables are used. Hot observables start emitting objects the moment they are created, irrespective of whether they are subscribed or not.
A cold observable can be converted to a hot observable by using publish operator and it starts emitting objects when connect operator is used. This is implemented in displayAndPostScrapedData:
ConnectableObservable<ScrapedData> observable = Observable.interval(4, TimeUnit.SECONDS) .flatMap(this::getSuggestionsPeriodically) .flatMap(query -> { mSuggestionQuerries.add(query); return getScrapedTweets(query); }) .retry(2) .publish(); // first time subscription to display scraped data Disposable viewDisposable = observable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( this::displayScrapedData, this::setNetworkErrorView ); mCompositeDisposable.add(viewDisposable); // second time subscription for pushing data to loklak Disposable pushDisposable = observable .flatMap(this::pushScrapedData) // scraped data transformed for pushing .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( push -> { mHarvestedTweets += push.getRecords(); harvestedTweetsCountTextView.setText(String.valueOf(mHarvestedTweets)); }, throwable -> {} ); mCompositeDisposable.add(pushDisposable); Disposable publishDisposable = observable.connect(); // hot observable starts emitting mCompositeDisposable.add(publishDisposable); }
The two subscriptions are made before connect operator is invoked because the hot observable emits objects due to successful network calls and network calls can’t be done on MainThread (UI Thread). So, doing the subscription before, channels the network calls to a background thread.
The scraped data is converted to JSON from objects using Gson, the JSON is converted to string and then using push API endpoint it is posted to loklak server. This is implemented in pushScrapedData method, which is used in second subscription by using method referencing.
private Observalbe<Push> pushScrapedData(ScrapedData scrapedData) throws Exception{ LoklakApi loklakApi = RestClient.createApi(LoklakApi.class); List<Status> statuses = scrapedData.getStatuses(); String data = mGson.toJson(statuses); JSONArray jsonArray = new JSONArray(data); JSONObject jsonObject = new JSONObject(); jsonObject.put("statuses", jsonArray); return loklakApi.pushTweetsToLoklak(jsonObject.toString()); }
Method reference for displayScrapedData and setNetworkErrorView methods are used to display the scraped data and handle unsuccessful network requests.
Only 80 tweets are preserved in RecyclerView. If number of tweets exceeds 80, then old tweets are removed.
private void displayScrapedData(ScrapedData scrapedData) { String query = scrapedData.getQuery(); List<Status> statuses = scrapedData.getStatuses(); mSuggestionQuerries.remove(query); if (mHarvestedTweetAdapter.getItemCount() > 80) { mHarvestedTweetAdapter.clearAdapter(); // old tweets removed } mHarvestedTweetAdapter.addHarvestedTweets(statuses); int count = mHarvestedTweetAdapter.getItemCount() - 1; recyclerView.scrollToPosition(count); }
In case of a network error, the visibility of RecyclerView and TextView (which shows number of tweets pushed) is changed to gone and a message is displayed that there is network error.
private void setNetworkErrorView(Throwable throwable) { Log.e(LOG_TAG, throwable.toString()); // recyclerView and TextView showing count of harvested tweets are hidden ButterKnife.apply(networkViews, GONE); // network error message displayed networkErrorTextView.setVisibility(View.VISIBLE); }
References
- flatMap RxJava: http://reactivex.io/documentation/operators/flatmap.html
Resources
- RxJava Introduction: https://code.tutsplus.com/tutorials/getting-started-with-rxjava-20-for-android–cms-28345
- Hot and cold observable: https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/6.%20Hot%20and%20Cold%20observables.md
- Wrapping Async calls with RxJava: https://medium.com/yammer-engineering/converting-callback-async-calls-to-rxjava-ebc68bde5831
You must be logged in to post a comment.