In the previous blog post, I discussed the changes made in loklak’s Kaizen harvester so it could be extended and other harvesting strategies could be introduced. Those changes made it possible to introduce a new harvesting strategy as PriorityKaizen harvester which uses a priority queue to store the queries that are to be processed. In this blog post, I will be discussing the process through which this new harvesting strategy was introduced in loklak.
Background, motivation and approach
Before jumping into the changes, we first need to understand that why do we need this new harvesting strategy. Let us start by discussing the issue with the Kaizen harvester.
The produce consumer imbalance in Kaizen harvester
Kaizen uses a simple hash queue to store queries. When the queue is full, new queries are dropped. But numbers of queries produced after searching for one query is much higher than the consumption rate, i.e. the queries are bound to overflow and new queries that arrive would get dropped. (See loklak/loklak_server#1156)
Learnings from attempt to add blocking queue for queries
As a solution to this problem, I first tried to use a blocking queue to store the queries. In this implementation, the producers would get blocked before putting the queries in the queue if it is full and would wait until there is space for more. This way, we would have a good balance between consumers and producers as the consumers would be waiting until producers can free up space for them –
public class BlockingKaizenHarvester extends KaizenHarvester { ... public BlockingKaizenHarvester() { super(new KaizenQueries() { ... private BlockingQueue<String> queries = new ArrayBlockingQueue<>(maxSize); @Override public boolean addQuery(String query) { if (this.queries.contains(query)) { return false; } try { this.queries.offer(query, this.blockingTimeout, TimeUnit.SECONDS); return true; } catch (InterruptedException e) { DAO.severe("BlockingKaizen Couldn't add query: " + query, e); return false; } } @Override public String getQuery() { try { return this.queries.take(); } catch (InterruptedException e) { DAO.severe("BlockingKaizen Couldn't get any query", e); return null; } } ... }); } }
[SOURCE, loklak/loklak_server#1210]
But there is an issue here. The consumers themselves are producers of even higher rate. When a search is performed, queries are requested to be appended to the KaizenQueries instance for the object (which here, would implement a blocking queue). Now let us consider the case where queue is full and a thread requests a query from the queue and scrapes data. Now when the scraping is finished, many new queries are requested to be inserted to most of them get blocked (because the queue would be full again after one query getting inserted).
Therefore, using a blocking queue in KaizenQueries is not a good thing to do.
Other considerations
After the failure of introducing the Blocking Kaizen harvester, we looked for other alternatives for storing queries. We came across multilevel queues, persistent disk queues and priority queues.
Multilevel queues sounded like a good idea at first where we would have multiple queues for storing queries. But eventually, this would just boil down to how much queue size are we allowing and the queries would eventually get dropped.
Persistent disk queues would allow us to store greater number of queries but the major disadvantage was lookup time. It would terribly slow to check if a query already exists in the disk queue when the queue is large. Also, since the queries would always increase practically, the disk queue would also go out of hand at some point in time.
So by now, we were clear that not dropping queries is not an alternative. So what we had to use the limited size queue smartly so that we do not drop queries that are important.
Solution: Priority Queue
So a good solution to our problem was a priority queue. We could assign a higher score to queries that come from more popular Tweets and they would go higher in the queue and do not drop off until we have even higher priority queried in the queue.
Assigning score to a Tweet
Score for a tweet was decided using the following formula –
α= 5* (retweet count)+(favourite count)
score=α/(α+10*exp(-0.01*α))
This equation generates a score between zero and one from the retweet and favourite count of a Tweet. This normalisation of score would ensure we do not assign an insanely large score to Tweets with a high retweet and favourite count. You can see the behaviour for the second mentioned equation here.
Graph?
Changes required in existing Kaizen harvester
To take a score into account, it became necessary to add an interface to also provide a score as a parameter to the addQuery() method in KaizenQueries. Also, not all queries can have a score associated with it, for example, if we add a query that would search for Tweets older than the oldest in the current timeline, giving it a score wouldn’t be possible as it would not be associated with a single Tweet. To tackle this, a default score of 0.5 was given to these queries –
public abstract class KaizenQueries { public boolean addQuery(String query) { return this.addQuery(query, 0.5); } public abstract boolean addQuery(String query, double score); ... }
[SOURCE]
Defining appropriate KaizenQueries object
The KaizenQueries object for a priority queue had to define a wrapper class that would hold the query and its score together so that they could be inserted in a queue as a single object.
ScoreWrapper and comparator
The ScoreWrapper is a simple class that stores score and query object together –
private class ScoreWrapper { private double score; private String query; ScoreWrapper(String m, double score) { this.query = m; this.score = score; } }
[SOURCE]
In order to define a way to sort the ScoreWrapper objects in the priority queue, we need to define a Comparator for it –
private Comparator<ScoreWrapper> scoreComparator = (scoreWrapper, t1) -> (int) (scoreWrapper.score - t1.score);
[SOURCE]
Putting things together
Now that we have all the ingredients to declare our priority queue, we can also declare the strategy to getQuery and putQuery in the corresponding KaizenQueries object –
public class PriorityKaizenHarvester extends KaizenHarvester { private static class PriorityKaizenQueries extends KaizenQueries { ... private Queue<ScoreWrapper> queue; private int maxSize; public PriorityKaizenQueries(int size) { this.maxSize = size; queue = new PriorityQueue<>(size, scoreComparator); } @Override public boolean addQuery(String query, double score) { ScoreWrapper sw = new ScoreWrapper(query, score); if (this.queue.contains(sw)) { return false; } try { this.queue.add(sw); return true; } catch (IllegalStateException e) { return false; } } @Override public String getQuery() { return this.queue.poll().query; } ... }
[SOURCE]
Conclusion
In this blog post, I discussed the process in which PriorityKaizen harvester was introduced to loklak. This strategy is a flavour of Kaizen harvester which uses a priority queue to store queries that are to be processed. These changes were possible because of a previous patch which allowed extending of Kaizen harvester.
The changes were introduced in pull request loklak/loklak#1240 by @singhpratyush (me).
Resources
- Pull request for BlockingKaizen harvester – https://github.com/loklak/loklak_server/pull/1210.
- Priority queue in Java documentation – https://docs.oracle.com/javase/7/docs/api/java/util/PriorityQueue.html.
- Code for generating the score graph – https://gist.github.com/singhpratyush/fb0758294342b1a5870888b7a70f1355.