Browse Source

initial code

master
Harish.K 11 years ago
parent
commit
f8bb6f9781
  1. 113
      src/JobManager.js
  2. 32
      src/utils.js
  3. 44
      test/JobManager.spec.js

113
src/JobManager.js

@ -0,0 +1,113 @@
// ഓം നമോ നാരായണായ
var utils = require('./utils');
var STATE = utils.createEnum(['INITIALIZING', 'INITIALIZED', 'STARTED', 'RUNNING', 'PAUSING', 'PAUSED', 'STOPPED']);
function JobManager(opts){
//opts
this.notifyAt = opts.notifyAt|| 3;
this.concurrency = opts.concurrency || 20;
// state
this.state = STATE.INITIALIZED;
this.workers = [];
this.tasks = [];
this.work = null;
this.onLoadMore = null;
this.onError = null;
this.isLoadingTakingPlace = false;
this.tmpPool = [];
}
JobManager.prototype.__defineSetter__( 'workers', function(workers){
var tmpPool = Array( workers.length > this.concurrency? workers.length : this.concurrency ), i, l, j;
this.$workers = workers;
if(! tmpPool.length ){
return;
}
for( i = 0, j=0, l = tmpPool.length; i< l; i++, j++ ){
if(j == workers.length ){ j =0; }
tmpPool[i] = workers[j];
}
this.tmpPool = tmpPool;
});
JobManager.prototype.__defineGetter__( 'workers', function(){
return this.$workers;
});
JobManager.prototype.returnToPool = function (w){
this.tmpPool.push(w);
};
JobManager.prototype.$doWork = function( task, worker, cb ){
var self = this;
this.work( task, worker, function(err){
if(err){
self.onError(err, task, worker );
}
self.returnToPool( worker );
return cb();
});
};
JobManager.prototype.getFromPool = function(){
return this.tmpPool.splice(0,1)[0];
};
JobManager.prototype.start = function( ){
var self = this;
var tasks, workers=[], i;
if ( this.state != STATE.INITIALIZED ){
throw new Error('Invalid state', STATE.INITIALIZED );
}
this.state = STATE.RUNNING;
var numOfTasks = self.concurrency;
if( numOfTasks > self.tasks.length ){
numOfTasks = self.tasks.length;
}
tasks = self.tasks.splice(0,numOfTasks);
for(i=0; i< numOfTasks; i++){
workers.push( self.getFromPool() );
}
tasks.forEach(function(task, i){
var worker = workers[i];
var cb = function(){
var i,l;
if(self.tasks.length/self.concurrency < self.notifyAt && (!self.isLoadingTakingPlace) ){
if( self.onLoadMore ) {
self.isLoadingTakingPlace = true;
self.onLoadMore(function(){
self.isLoadingTakingPlace = false;
});
}
}
if(self.state != STATE.RUNNING ){
return;
}
var nextTask = self.tasks.splice(0,1)[0];
if (!nextTask){
self.state = STATE.STOPPED;
return;
}
// console.log( 'lastUsed', self.lastUsedWorker );
self.$doWork( nextTask, self.getFromPool(), cb );
};
self.$doWork( task, worker, cb );
});
};
JobManager.prototype.pause = function(){
this.state = STATE.PAUSED;
};
JobManager.prototype.resume = function(){
this.state = STATE.INITIALIZED;
this.start();
};
exports.JobManager = JobManager;

32
src/utils.js

@ -0,0 +1,32 @@
function clone(obj){
var newObj = {};
Object.keys(obj).forEach(function(k){ newObj[k] = obj[k]; });
return newObj;
}
function defaults( def, newValue ){
var out = clone( def );
if(newValue)
Object.keys(newValue).forEach(function(k){ out[k] = newValue[k]; });
return out;
}
function range(n){
var x=Array(n);
var i=0;
while(i<n){ x[i] = i;i++; }
return x;
}
function createEnum(a){
var out = {};
a.forEach(function(v, k){
out[v.toUpperCase()] = {index: k, name: v.toLowerCase() };
});
return out;
}
exports.clone = clone;
exports.defaults = defaults;
exports.range = range;
exports.createEnum = createEnum;

44
test/JobManager.spec.js

@ -0,0 +1,44 @@
var JobManager = require('../src/JobManager.js').JobManager;
var utils = require('../src/utils.js');
var range = utils.range;
var wrks = range(400000);
wrks.forEach( function(v,k){
wrks[k] = function(data,cb){
var self = this;
process.nextTick(function(){
// console.log( 'I am done: ', v );
cb( null, data);
} );
};
wrks[k].id = v;
});
var tStart = 0;
var getTasks = function(){
var i, out=[];
for( i = 0; i< 1000; i++, tStart++ ){
out[i] = tStart;
}
return out;
};
var tasks = getTasks();
var jm = new JobManager({concurrency:3});
jm.tasks = tasks;
jm.workers = wrks;
jm.onError = function(err, task, worker ){
console.log( 'Error...', arguments );
};
jm.work = function(task, worker, cb){
// console.log( 'Worker ', worker.id, ' with ', task );
return worker( task, cb );
};
jm.onLoadMore = function( cb ){
console.log( 'Loading......');
this.tasks = this.tasks.concat( getTasks() );
return cb();
};
jm.start();
Loading…
Cancel
Save