From 4a09d48f55869556d6e63a46c715c27152f22064 Mon Sep 17 00:00:00 2001 From: "Harish.K" Date: Sun, 31 Aug 2014 23:22:45 +0530 Subject: [PATCH] Heavy rewrite --- src/JobManager.js | 120 +++++++++++++++++++--------------------- test/JobManager.spec.js | 112 +++++++++++++++++++++++-------------- test/testrun.js | 58 +++++++++++++++++++ 3 files changed, 188 insertions(+), 102 deletions(-) create mode 100644 test/testrun.js diff --git a/src/JobManager.js b/src/JobManager.js index ee77e4d..634b347 100644 --- a/src/JobManager.js +++ b/src/JobManager.js @@ -2,7 +2,7 @@ var utils = require('./utils'); -var STATE = utils.createEnum(['INITIALIZING', 'INITIALIZED', 'STARTED', 'RUNNING', 'PAUSING', 'PAUSED', 'STOPPED']); +var STATE = utils.createEnum([ 'RUNNING' , 'NOT_RUNNING']); function JobManager(opts){ @@ -11,7 +11,7 @@ function JobManager(opts){ this.concurrency = opts.concurrency || 20; // state - this.state = STATE.INITIALIZED; + this.state = STATE.NOT_RUNNING; this.workers = []; this.tasks = []; @@ -19,94 +19,90 @@ function JobManager(opts){ this.onLoadMore = null; this.onError = null; this.isLoadingTakingPlace = false; - this.tmpPool = []; + this.runningTasks = 0; } -JobManager.prototype.__defineSetter__( 'workers', function(workers){ - var tmpPool = Array( workers.length > this.concurrency? workers.length : this.concurrency ), i, l, j; - this.$workers = workers; - if(! tmpPool.length ){ - return; - } - for( i = 0, j=0, l = tmpPool.length; i< l; i++, j++ ){ - if(j == workers.length ){ j =0; } - tmpPool[i] = workers[j]; +/* + * JobManager.prototype.__defineSetter__( 'workers', function(workers){ + * this.$workers = workers; + * this.updateState(); + * }); + */ + +JobManager.prototype.updateState = function(){ + var workers = this.workers; + var tmpPoolLen = workers.length && (workers.length * Math.ceil( this.concurrency/workers.length ) ) , i; + var tmpPool = []; + for( i=0; i < tmpPoolLen; i++){ + tmpPool[i] = workers[ i % workers.length ]; } this.tmpPool = tmpPool; -}); +}; -JobManager.prototype.__defineGetter__( 'workers', function(){ - return this.$workers; -}); +/* + * JobManager.prototype.__defineGetter__( 'workers', function(){ + * return this.$workers; + * }); + */ JobManager.prototype.returnToPool = function (w){ - this.tmpPool.push(w); + this.runningTasks--; + this.tmpPool.push( w ); +}; + +JobManager.prototype.getFromPool = function(){ + if( this.runningTasks < this.concurrency ){ + this.runningTasks++; + return this.tmpPool.splice(0,1)[0]; + } }; -JobManager.prototype.$doWork = function( task, worker, cb ){ +JobManager.prototype.$doWork_ = function( cb ){ var self = this; + var worker = self.getFromPool(); + var task = self.tasks.splice(0, 1)[0]; this.work( task, worker, function(err){ if(err){ self.onError(err, task, worker ); } + if(self.tasks.length/self.concurrency < self.notifyAt && (!self.isLoadingTakingPlace) ){ + if( self.onLoadMore ) { + self.$onLoadMore(); + } + } + cb(); self.returnToPool( worker ); - return cb(); }); }; -JobManager.prototype.getFromPool = function(){ - return this.tmpPool.splice(0,1)[0]; -}; - - -JobManager.prototype.start = function( ){ +JobManager.prototype.$trigger = function (){ var self = this; - var tasks, workers=[], i; - if ( this.state != STATE.INITIALIZED ){ - throw new Error('Invalid state', STATE.INITIALIZED ); + while( this.runningTasks < this.concurrency && this.state == STATE.RUNNING && this.tasks.length ){ + this.$doWork_( function(){ + self.$trigger(); + }); } +}; + +JobManager.prototype._start = function( ) { this.state = STATE.RUNNING; - var numOfTasks = self.concurrency; - if( numOfTasks > self.tasks.length ){ - numOfTasks = self.tasks.length; - } - tasks = self.tasks.splice(0,numOfTasks); - for(i=0; i< numOfTasks; i++){ - workers.push( self.getFromPool() ); - } - tasks.forEach(function(task, i){ - var worker = workers[i]; - var cb = function(){ - var i,l; - if(self.tasks.length/self.concurrency < self.notifyAt && (!self.isLoadingTakingPlace) ){ - if( self.onLoadMore ) { - self.isLoadingTakingPlace = true; - self.onLoadMore(function(){ - self.isLoadingTakingPlace = false; - }); - } - } - if(self.state != STATE.RUNNING ){ - return; - } - var nextTask = self.tasks.splice(0,1)[0]; - if (!nextTask){ - self.state = STATE.STOPPED; - return; - } - // console.log( 'lastUsed', self.lastUsedWorker ); - self.$doWork( nextTask, self.getFromPool(), cb ); - }; - self.$doWork( task, worker, cb ); + this.updateState(); + this.$trigger(); +}; + +JobManager.prototype.$onLoadMore = function(){ + var self = this; + self.isLoadingTakingPlace = true; + self.onLoadMore(function(){ + self.isLoadingTakingPlace = false; }); }; JobManager.prototype.pause = function(){ - this.state = STATE.PAUSED; + this.state = STATE.NOT_RUNNING; }; JobManager.prototype.resume = function(){ - this.state = STATE.INITIALIZED; this.start(); }; diff --git a/test/JobManager.spec.js b/test/JobManager.spec.js index 53a19dc..b259022 100644 --- a/test/JobManager.spec.js +++ b/test/JobManager.spec.js @@ -1,44 +1,76 @@ +/*global describe, it, before, beforeEach, after, afterEach, should */ + var JobManager = require('../src/JobManager.js').JobManager; var utils = require('../src/utils.js'); -var range = utils.range; - -var wrks = range(400000); -wrks.forEach( function(v,k){ - wrks[k] = function(data,cb){ - var self = this; - process.nextTick(function(){ - // console.log( 'I am done: ', v ); - cb( null, data); - } ); +var should = require('should'); + +var NO_WORKERS = 5; +var workers = [], i; +for ( i = 0; i < NO_WORKERS; i ++) { + workers[i] = function( data, cb ){ + console.log(i, ' Processing ', data ); + setTimeout( function(){ + console.log( 'Iam ', i, ' Processed ', data ); + }, i*10 ); }; - wrks[k].id = v; -}); + workers[i].name = i; +} +var NO_TASKS=100; + +var tasks = utils.range( NO_TASKS ); + +var TEST_CONCURENCY = 3; -var tStart = 0; -var getTasks = function(){ - var i, out=[]; - for( i = 0; i< 1000; i++, tStart++ ){ - out[i] = tStart; - } - return out; -}; - -var tasks = getTasks(); - -var jm = new JobManager({concurrency:3}); -jm.tasks = tasks; -jm.workers = wrks; -jm.onError = function(err, task, worker ){ - console.log( 'Error...', arguments ); -}; - -jm.work = function(task, worker, cb){ - // console.log( 'Worker ', worker.id, ' with ', task ); - return worker( task, cb ); -}; -jm.onLoadMore = function( cb ){ - console.log( 'Loading......'); - this.tasks = this.tasks.concat( getTasks() ); - return cb(); -}; -jm.start(); +describe('JobManager', function(){ + + describe('Initialization', function(){ + var jm = new JobManager( { concurrency: TEST_CONCURENCY }); + jm.work = function( task, worker, cb ){ + worker( task , cb ); + }; + jm.workers = workers; + jm.tasks = tasks; + jm.onError = function(err, task, worker ){ + console.log( 'Error...', arguments ); + }; + jm.onLoadMore = function( cb ){ + console.log( 'Loading......'); + this.tasks = this.tasks.concat( tasks ); + return cb(); + }; + it('should Initialize properly', function(done){ + jm.concurrency = jm.workers.length -1; + jm.updateState(); + jm.tmpPool.should.have.length(NO_WORKERS); + jm.concurrency = jm.workers.length +1; + jm.updateState(); + jm.tmpPool.should.have.length(NO_WORKERS*2); + + jm.tasks.should.have.length( NO_TASKS ); + jm.workers.should.have.length( NO_WORKERS ); + jm.runningTasks.should.be.equal(0); + done(); + }); + + it('getFromPool, returnToPool', function(done){ + var store = [], i, w; + for( i =0; i< jm.concurrency; i++ ){ + w = jm.getFromPool(); + jm.runningTasks.should.be.equal( i+1); + should.exist( w ); + store.push(w); + } + for( i =0; i< 6; i++ ){ + w = jm.getFromPool(); + jm.runningTasks.should.be.equal( jm.concurrency ); + should.not.exist( w ); + } + store.forEach( function(v, i){ + jm.returnToPool(v); + jm.runningTasks.should.be.equal( jm.concurrency-i-1 ); + }); + jm.runningTasks.should.be.equal( 0 ); + done(); + }); + }); +}); diff --git a/test/testrun.js b/test/testrun.js new file mode 100644 index 0000000..f309c80 --- /dev/null +++ b/test/testrun.js @@ -0,0 +1,58 @@ +var JobManager = require('../src/JobManager.js').JobManager; +var utils = require('../src/utils.js'); +var range = utils.range; +var async = require('async'); + +var wrks = range(4); +wrks.forEach( function(v,k){ + wrks[k] = function(data,cb){ + console.log( 'S ', data ); + setTimeout( function(){ + console.log( 'E ', data ); + cb(); + } , ( 1 ) * 1000 ); + }; + wrks[k].id = v; +}); + +var tStart = 0; +var getTasks = function(){ + var i, out=[]; + for( i = 0; i< 50; i++, tStart++ ){ + out[i] = tStart; + } + return out; +}; + +var tasks = getTasks(); +if(0){ + console.log( tasks.length ); + var y = 0; + async.eachLimit( tasks, 300, function( data, cb ){ + setTimeout( function(){ + console.log( data ); + cb(); + } , 1); + }, console.log ); + + return; +} + +var jm = new JobManager({concurrency: 6}); +jm.tasks = tasks; +jm.workers = wrks; +jm.onError = function(err, task, worker ){ + console.log( 'Error...', arguments ); +}; + +jm.work = function(task, worker, cb){ + // console.log( 'Worker ', worker.id, ' with ', task ); + return worker( task, cb ); +}; +jm.onLoadMore = function( cb ){ + console.log( 'Loading......'); + this.tasks = this.tasks.concat( getTasks() ); + return cb(); +}; +// jm.start(); +jm._start();