@ -2,7 +2,7 @@
var utils = require ( './utils' ) ;
var STATE = utils . createEnum ( [ 'RUNNING' , 'NOT_RUNNING' ] ) ;
var STATE = utils . createEnum ( [ 'RUNNING' , 'PAUSED' , 'NOT_RUNNING' ] ) ;
function JobManager ( opts ) {
@ -75,33 +75,37 @@ JobManager.prototype.getFromPool = function(){
JobManager . prototype . $doWork_ = function ( cb ) {
var self = this ;
var task = self . tasks . splice ( 0 , 1 ) [ 0 ] ;
var task = this . tasks . splice ( 0 , 1 ) [ 0 ] ;
if ( self . tasks . length / self . concurrency < self . notifyAt && ( ! self . isLoadingTakingPlace ) ) {
if ( self . onLoadMore && ! self . endReached ) {
self . $onLoadMore ( ) ;
}
}
if ( task == undefined ) {
this . stop ( ) ;
if ( this . isLoadingTakingPlace ) {
this . pause ( ) ;
} else {
this . stop ( ) ;
}
return ;
}
var worker = self . getFromPool ( ) ;
var worker = this . getFromPool ( ) ;
this . work ( task , worker , function ( err ) {
if ( err && self . onError ) {
self . onError ( err , task , worker ) ;
}
if ( self . tasks . length / self . concurrency < self . notifyAt && ( ! self . isLoadingTakingPlace ) ) {
if ( self . onLoadMore && ! self . endReached ) {
self . $onLoadMore ( ) ;
}
}
cb ( ) ;
if ( self . state == STATE . NOT_RUNNING ) {
if ( ( self . runningTasks == 1 ) && self . onStopped ) { self . onStopped ( ) ; }
}
self . returnToPool ( worker ) ;
if ( self . state === STATE . NOT_RUNNING ) {
if ( ( self . runningTasks === 0 ) && self . onStopped ) { self . onStopped ( ) ; }
}
return cb ( ) ;
} ) ;
} ;
JobManager . prototype . $trigger = function ( ) {
var self = this ;
while ( this . runningTasks < this . concurrency && this . state == STATE . RUNNING ) {
while ( this . runningTasks < this . concurrency && this . state === STATE . RUNNING ) {
this . $doWork_ ( function ( ) {
self . $trigger ( ) ;
} ) ;
@ -126,16 +130,22 @@ JobManager.prototype.$onLoadMore = function( cb ){
self . isLoadingTakingPlace = true ;
self . onLoadMore ( function ( ) {
self . isLoadingTakingPlace = false ;
if ( self . state === STATE . PAUSED ) {
self . resume ( ) ;
}
return cb ( ) ;
} ) ;
} ;
JobManager . prototype . pause = function ( ) {
this . state = STATE . NOT_RUNNING ;
this . state = STATE . PAUSED ;
} ;
JobManager . prototype . stop = function ( ) {
this . state = STATE . NOT_RUNNING ;
if ( ( this . tasks . length === 0 ) && ( this . runningTasks === 0 ) && ( ! this . isLoadingTakingPlace ) ) {
this . onStopped ( ) ;
}
} ;