Implementing job queue in Open Event Web app

Open Event Web app enables multiple request handling by the implementation of queue system in its generator. Every request received from the client is saved and stored in the queue backed by redis server. The jobs are then processed one at a time using the FCFS (First come First Serve) job scheduling algorithm. Processing the requests one by one prevents the crashing of app and also prevents the loss of requests from the client.

Initialising job queue

The job queue is initialised with a name and the connection object of redis server as the arguments.

const redisClient =  require('redis').createClient(process.env.REDIS_URL);
const Queue = require('bee-queue');
const queue = new Queue('generator-queue', {redis: redisClient});

Handling jobs in queue

The client emits an event namely ‘live’ when request for event generation is received, the corresponding event is listened and a new job for the request is created and enqueued in the job queue. Every request received by the client is saved to ensure that there is no loss of request. The queue is then searched for the requests or the jobs which are in ‘waiting’ state, if the current request status for the job Id is waiting the socket emits an event namely ‘waiting’.

socket.on('live', function(formData) {
 const req = {body: formData};
 const job = queue.createJob(req);

 job.on('succeeded', function() {
   console.log('completed job ' + job.id);
 });

 job.save(async function(err, currentJob) {
   if (err) {
     console.log('job failed to save');
   }
   emitter = socket;
   console.log('saved job ' + currentJob.id);
   const jobs = await queue.getJobs('waiting', {start: 0, end: 25});
   const jobIds = await jobs.map((currJob) => currJob.id);

   if(jobIds.indexOf(currentJob.id) !== -1) {
     socket.emit('waiting');
   }
 });

});

Updating the status of request

If the socket emits the event ‘waiting’ it signifies that some other job is currently in process and the status of the current request is ‘waiting’.

socket.on('waiting', function () {
 updateStatusAnimate('Request status: Waiting');
});

Processing the jobs

When the queue is in ready state and no job is currently in process, it starts processing the saved job. The job is not completed until it receives a callback. The generator starts generating the event when the processing of request starts.

queue.on('ready', function() {
 queue.process(function(job, done) {
   console.log('processing job ' + job.id);
   generator.createDistDir(job.data, emitter, done);
 });
 console.log('processing jobs...');
});

 

The generator calls the callback function for the current job when the event generation completes or it is halted in between due to some error. As soon as the current job completes, next job in the queue starts being processed.

generator.createDistDir() = function(req, socket, callback){
  
  .....
  .....
  .....

  mailer.uploadAndsendMail(req.body.email, eventName, socket, (obj) => {
    if(obj.mail)
      logger.addLog('Success', 'Mail sent succesfully', socket);
    else
      logger.addLog('Error', 'Error sending mail', socket);

    if(emit) {
      socket.emit('live.ready', {
        appDir: appFolder,
        url: obj.url
      });
      callback(null);
    }
    else {
      callback(appFolder);
    }

    done(null, 'write');
   });
}

Resources

Close Menu