Browse Source

Heavy rewrite

master
Harish.K 11 years ago
parent
commit
4a09d48f55
  1. 120
      src/JobManager.js
  2. 112
      test/JobManager.spec.js
  3. 58
      test/testrun.js

120
src/JobManager.js

@ -2,7 +2,7 @@
var utils = require('./utils');
var STATE = utils.createEnum(['INITIALIZING', 'INITIALIZED', 'STARTED', 'RUNNING', 'PAUSING', 'PAUSED', 'STOPPED']);
var STATE = utils.createEnum([ 'RUNNING' , 'NOT_RUNNING']);
function JobManager(opts){
@ -11,7 +11,7 @@ function JobManager(opts){
this.concurrency = opts.concurrency || 20;
// state
this.state = STATE.INITIALIZED;
this.state = STATE.NOT_RUNNING;
this.workers = [];
this.tasks = [];
@ -19,94 +19,90 @@ function JobManager(opts){
this.onLoadMore = null;
this.onError = null;
this.isLoadingTakingPlace = false;
this.tmpPool = [];
this.runningTasks = 0;
}
JobManager.prototype.__defineSetter__( 'workers', function(workers){
var tmpPool = Array( workers.length > this.concurrency? workers.length : this.concurrency ), i, l, j;
this.$workers = workers;
if(! tmpPool.length ){
return;
}
for( i = 0, j=0, l = tmpPool.length; i< l; i++, j++ ){
if(j == workers.length ){ j =0; }
tmpPool[i] = workers[j];
/*
* JobManager.prototype.__defineSetter__( 'workers', function(workers){
* this.$workers = workers;
* this.updateState();
* });
*/
JobManager.prototype.updateState = function(){
var workers = this.workers;
var tmpPoolLen = workers.length && (workers.length * Math.ceil( this.concurrency/workers.length ) ) , i;
var tmpPool = [];
for( i=0; i < tmpPoolLen; i++){
tmpPool[i] = workers[ i % workers.length ];
}
this.tmpPool = tmpPool;
});
};
JobManager.prototype.__defineGetter__( 'workers', function(){
return this.$workers;
});
/*
* JobManager.prototype.__defineGetter__( 'workers', function(){
* return this.$workers;
* });
*/
JobManager.prototype.returnToPool = function (w){
this.tmpPool.push(w);
this.runningTasks--;
this.tmpPool.push( w );
};
JobManager.prototype.getFromPool = function(){
if( this.runningTasks < this.concurrency ){
this.runningTasks++;
return this.tmpPool.splice(0,1)[0];
}
};
JobManager.prototype.$doWork = function( task, worker, cb ){
JobManager.prototype.$doWork_ = function( cb ){
var self = this;
var worker = self.getFromPool();
var task = self.tasks.splice(0, 1)[0];
this.work( task, worker, function(err){
if(err){
self.onError(err, task, worker );
}
if(self.tasks.length/self.concurrency < self.notifyAt && (!self.isLoadingTakingPlace) ){
if( self.onLoadMore ) {
self.$onLoadMore();
}
}
cb();
self.returnToPool( worker );
return cb();
});
};
JobManager.prototype.getFromPool = function(){
return this.tmpPool.splice(0,1)[0];
};
JobManager.prototype.start = function( ){
JobManager.prototype.$trigger = function (){
var self = this;
var tasks, workers=[], i;
if ( this.state != STATE.INITIALIZED ){
throw new Error('Invalid state', STATE.INITIALIZED );
while( this.runningTasks < this.concurrency && this.state == STATE.RUNNING && this.tasks.length ){
this.$doWork_( function(){
self.$trigger();
});
}
};
JobManager.prototype._start = function( ) {
this.state = STATE.RUNNING;
var numOfTasks = self.concurrency;
if( numOfTasks > self.tasks.length ){
numOfTasks = self.tasks.length;
}
tasks = self.tasks.splice(0,numOfTasks);
for(i=0; i< numOfTasks; i++){
workers.push( self.getFromPool() );
}
tasks.forEach(function(task, i){
var worker = workers[i];
var cb = function(){
var i,l;
if(self.tasks.length/self.concurrency < self.notifyAt && (!self.isLoadingTakingPlace) ){
if( self.onLoadMore ) {
self.isLoadingTakingPlace = true;
self.onLoadMore(function(){
self.isLoadingTakingPlace = false;
});
}
}
if(self.state != STATE.RUNNING ){
return;
}
var nextTask = self.tasks.splice(0,1)[0];
if (!nextTask){
self.state = STATE.STOPPED;
return;
}
// console.log( 'lastUsed', self.lastUsedWorker );
self.$doWork( nextTask, self.getFromPool(), cb );
};
self.$doWork( task, worker, cb );
this.updateState();
this.$trigger();
};
JobManager.prototype.$onLoadMore = function(){
var self = this;
self.isLoadingTakingPlace = true;
self.onLoadMore(function(){
self.isLoadingTakingPlace = false;
});
};
JobManager.prototype.pause = function(){
this.state = STATE.PAUSED;
this.state = STATE.NOT_RUNNING;
};
JobManager.prototype.resume = function(){
this.state = STATE.INITIALIZED;
this.start();
};

112
test/JobManager.spec.js

@ -1,44 +1,76 @@
/*global describe, it, before, beforeEach, after, afterEach, should */
var JobManager = require('../src/JobManager.js').JobManager;
var utils = require('../src/utils.js');
var range = utils.range;
var wrks = range(400000);
wrks.forEach( function(v,k){
wrks[k] = function(data,cb){
var self = this;
process.nextTick(function(){
// console.log( 'I am done: ', v );
cb( null, data);
} );
var should = require('should');
var NO_WORKERS = 5;
var workers = [], i;
for ( i = 0; i < NO_WORKERS; i ++) {
workers[i] = function( data, cb ){
console.log(i, ' Processing ', data );
setTimeout( function(){
console.log( 'Iam ', i, ' Processed ', data );
}, i*10 );
};
wrks[k].id = v;
});
workers[i].name = i;
}
var NO_TASKS=100;
var tasks = utils.range( NO_TASKS );
var TEST_CONCURENCY = 3;
var tStart = 0;
var getTasks = function(){
var i, out=[];
for( i = 0; i< 1000; i++, tStart++ ){
out[i] = tStart;
}
return out;
};
var tasks = getTasks();
var jm = new JobManager({concurrency:3});
jm.tasks = tasks;
jm.workers = wrks;
jm.onError = function(err, task, worker ){
console.log( 'Error...', arguments );
};
jm.work = function(task, worker, cb){
// console.log( 'Worker ', worker.id, ' with ', task );
return worker( task, cb );
};
jm.onLoadMore = function( cb ){
console.log( 'Loading......');
this.tasks = this.tasks.concat( getTasks() );
return cb();
};
jm.start();
describe('JobManager', function(){
describe('Initialization', function(){
var jm = new JobManager( { concurrency: TEST_CONCURENCY });
jm.work = function( task, worker, cb ){
worker( task , cb );
};
jm.workers = workers;
jm.tasks = tasks;
jm.onError = function(err, task, worker ){
console.log( 'Error...', arguments );
};
jm.onLoadMore = function( cb ){
console.log( 'Loading......');
this.tasks = this.tasks.concat( tasks );
return cb();
};
it('should Initialize properly', function(done){
jm.concurrency = jm.workers.length -1;
jm.updateState();
jm.tmpPool.should.have.length(NO_WORKERS);
jm.concurrency = jm.workers.length +1;
jm.updateState();
jm.tmpPool.should.have.length(NO_WORKERS*2);
jm.tasks.should.have.length( NO_TASKS );
jm.workers.should.have.length( NO_WORKERS );
jm.runningTasks.should.be.equal(0);
done();
});
it('getFromPool, returnToPool', function(done){
var store = [], i, w;
for( i =0; i< jm.concurrency; i++ ){
w = jm.getFromPool();
jm.runningTasks.should.be.equal( i+1);
should.exist( w );
store.push(w);
}
for( i =0; i< 6; i++ ){
w = jm.getFromPool();
jm.runningTasks.should.be.equal( jm.concurrency );
should.not.exist( w );
}
store.forEach( function(v, i){
jm.returnToPool(v);
jm.runningTasks.should.be.equal( jm.concurrency-i-1 );
});
jm.runningTasks.should.be.equal( 0 );
done();
});
});
});

58
test/testrun.js

@ -0,0 +1,58 @@
var JobManager = require('../src/JobManager.js').JobManager;
var utils = require('../src/utils.js');
var range = utils.range;
var async = require('async');
var wrks = range(4);
wrks.forEach( function(v,k){
wrks[k] = function(data,cb){
console.log( 'S ', data );
setTimeout( function(){
console.log( 'E ', data );
cb();
} , ( 1 ) * 1000 );
};
wrks[k].id = v;
});
var tStart = 0;
var getTasks = function(){
var i, out=[];
for( i = 0; i< 50; i++, tStart++ ){
out[i] = tStart;
}
return out;
};
var tasks = getTasks();
if(0){
console.log( tasks.length );
var y = 0;
async.eachLimit( tasks, 300, function( data, cb ){
setTimeout( function(){
console.log( data );
cb();
} , 1);
}, console.log );
return;
}
var jm = new JobManager({concurrency: 6});
jm.tasks = tasks;
jm.workers = wrks;
jm.onError = function(err, task, worker ){
console.log( 'Error...', arguments );
};
jm.work = function(task, worker, cb){
// console.log( 'Worker ', worker.id, ' with ', task );
return worker( task, cb );
};
jm.onLoadMore = function( cb ){
console.log( 'Loading......');
this.tasks = this.tasks.concat( getTasks() );
return cb();
};
// jm.start();
jm._start();
Loading…
Cancel
Save