The Open Source kanban (built with Meteor). Keep variable/table/field names camelCase. For translations, only add Pull Request changes to wekan/i18n/en.i18n.json , other translations are done at https://transifex.com/wekan/wekan only.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
wekan/packages/wekan-cfs-power-queue/power-queue.js

727 lines
25 KiB

// Rig weak dependencies
if (typeof MicroQueue === 'undefined' && Package['micro-queue']) {
MicroQueue = Package['micro-queue'].MicroQueue;
}
if (typeof ReactiveList === 'undefined' && Package['reactive-list']) {
ReactiveList = Package['reactive-list'].ReactiveList;
}
// Rig weak dependencies in +0.9.1
if (typeof MicroQueue === 'undefined' && Package['wekan-cfs-micro-queue']) {
MicroQueue = Package['wekan-cfs-micro-queue'].MicroQueue;
}
if (typeof ReactiveList === 'undefined' && Package['wekan-cfs-reactive-list']) {
ReactiveList = Package['wekan-cfs-reactive-list'].ReactiveList;
}
/**
* Creates an instance of a power queue // Testing inline comment
* [Check out demo](http://power-queue-test.meteor.com/)
*
* @constructor
* @self powerqueue
* @param {object} [options] Settings
* @param {boolean} [options.filo=false] Make it a first in last out queue
* @param {boolean} [options.isPaused=false] Set queue paused
* @param {boolean} [options.autostart=true] May adding a task start the queue
* @param {string} [options.name="Queue"] Name of the queue
* @param {number} [options.maxProcessing=1] Limit of simultanous running tasks
* @param {number} [options.maxFailures = 5] Limit retries of failed tasks, if 0 or below we allow infinite failures
* @param {number} [options.jumpOnFailure = true] Jump to next task and retry failed task later
* @param {boolean} [options.debug=false] Log verbose messages to the console
* @param {boolean} [options.reactive=true] Set whether or not this queue should be reactive
* @param {boolean} [options.onAutostart] Callback for the queue autostart event
* @param {boolean} [options.onPaused] Callback for the queue paused event
* @param {boolean} [options.onReleased] Callback for the queue release event
* @param {boolean} [options.onEnded] Callback for the queue end event
* @param {[SpinalQueue](spinal-queue.spec.md)} [options.spinalQueue] Set spinal queue uses pr. default `MicroQueue` or `ReactiveList` if added to the project
*/
PowerQueue = function(options) {
var self = this;
var test = 5;
self.reactive = (options && options.reactive === false) ? false : true;
// Allow user to use another micro-queue #3
// We try setting the ActiveQueue to MicroQueue if installed in the app
var ActiveQueue = (typeof MicroQueue !== 'undefined') && MicroQueue || undefined;
// If ReactiveList is added to the project we use this over MicroQueue
ActiveQueue = (typeof ReactiveList !== 'undefined') && ReactiveList || ActiveQueue;
// We allow user to overrule and set a custom spinal-queue spec complient queue
if (options && typeof options.spinalQueue !== 'undefined') {
ActiveQueue = options.spinalQueue;
}
if (typeof ActiveQueue === 'undefined') {
console.log('Error: You need to add a spinal queue to the project');
console.log('Please add "micro-queue", "reactive-list" to the project');
throw new Error('Please add "micro-queue", "reactive-list" or other spinalQueue compatible packages');
}
// Default is fifo lilo
self.invocations = new ActiveQueue({
//
sort: (options && (options.filo || options.lifo)),
reactive: self.reactive
});
//var self.invocations = new ReactiveList(queueOrder);
// List of current tasks being processed
self._processList = new ActiveQueue({
reactive: self.reactive
}); //ReactiveList();
// Max number of simultanious tasks being processed
self._maxProcessing = new ReactiveProperty(options && options.maxProcessing || 1, self.reactive);
// Reactive number of tasks being processed
self._isProcessing = new ReactiveProperty(0, self.reactive);
// Boolean indicating if queue is paused or not
self._paused = new ReactiveProperty((options && options.isPaused || false), self.reactive);
// Boolean indicator for queue status active / running (can still be paused)
self._running = new ReactiveProperty(false, self.reactive);
// Counter for errors, errors are triggered if maxFailures is exeeded
self._errors = new ReactiveProperty(0, self.reactive);
// Counter for task failures, contains error count
self._failures = new ReactiveProperty(0, self.reactive);
// On failure jump to new task - if false the current task is rerun until error
self._jumpOnFailure = (options && options.jumpOnFailure === false) ? false : true;
// Count of all added tasks
self._maxLength = new ReactiveProperty(0, self.reactive);
// Boolean indicate whether or not a "add" task is allowed to start the queue
self._autostart = new ReactiveProperty( ((options && options.autostart === false) ? false : true), self.reactive);
// Limit times a task is allowed to fail and be rerun later before triggering an error
self._maxFailures = new ReactiveProperty( (options && options.maxFailures || 5), self.reactive);
// Name / title of this queue - Not used - should deprecate
self.title = options && options.name || 'Queue';
// debug - will print error / failures passed to next
self.debug = !!(options && options.debug);
/** @method PowerQueue.total
* @reactive
* @returns {number} The total number of tasks added to this queue
*/
self.total = self._maxLength.get;
/** @method PowerQueue.isPaused
* @reactive
* @returns {boolean} Status of the paused state of the queue
*/
self.isPaused = self._paused.get;
/** @method PowerQueue.processing
* @reactive
* @returns {number} Number of tasks currently being processed
*/
self.processing = self._isProcessing.get;
/** @method PowerQueue.errors
* @reactive
* @returns {number} The total number of errors
* Errors are triggered when [maxFailures](PowerQueue.maxFailures) are exeeded
*/
self.errors = self._errors.get;
/** @method PowerQueue.failures
* @reactive
* @returns {number} The total number of failed tasks
*/
self.failures = self._failures.get;
/** @method PowerQueue.isRunning
* @reactive
* @returns {boolean} True if the queue is running
* > NOTE: The task can be paused but marked as running
*/
self.isRunning = self._running.get;
/** @method PowerQueue.maxProcessing Get setter for maxProcessing
* @param {number} [max] If not used this function works as a getter
* @reactive
* @returns {number} Maximum number of simultaneous processing tasks
*
* Example:
* ```js
* foo.maxProcessing(); // Works as a getter and returns the current value
* foo.maxProcessing(20); // This sets the value to 20
* ```
*/
self.maxProcessing = self._maxProcessing.getset;
self._maxProcessing.onChange = function() {
// The user can change the max allowed processing tasks up or down here...
// Update the throttle up
self.updateThrottleUp();
// Update the throttle down
self.updateThrottleDown();
};
/** @method PowerQueue.autostart Get setter for autostart
* @param {boolean} [autorun] If not used this function works as a getter
* @reactive
* @returns {boolean} If adding a task may trigger the queue to start
*
* Example:
* ```js
* foo.autostart(); // Works as a getter and returns the current value
* foo.autostart(true); // This sets the value to true
* ```
*/
self.autostart = self._autostart.getset;
/** @method PowerQueue.maxFailures Get setter for maxFailures
* @param {number} [max] If not used this function works as a getter
* @reactive
* @returns {number} The maximum for failures pr. task before triggering an error
*
* Example:
* ```js
* foo.maxFailures(); // Works as a getter and returns the current value
* foo.maxFailures(10); // This sets the value to 10
* ```
*/
self.maxFailures = self._maxFailures.getset;
/** @callback PowerQueue.onPaused
* Is called when queue is ended
*/
self.onPaused = options && options.onPaused || function() {
self.debug && console.log(self.title + ' ENDED');
};
/** @callback PowerQueue.onEnded
* Is called when queue is ended
*/
self.onEnded = options && options.onEnded || function() {
self.debug && console.log(self.title + ' ENDED');
};
/** @callback PowerQueue.onRelease
* Is called when queue is released
*/
self.onRelease = options && options.onRelease || function() {
self.debug && console.log(self.title + ' RELEASED');
};
/** @callback PowerQueue.onAutostart
* Is called when queue is auto started
*/
self.onAutostart = options && options.onAutostart || function() {
self.debug && console.log(self.title + ' Autostart');
};
};
/** @method PowerQueue.prototype.processList
* @reactive
* @returns {array} List of tasks currently being processed
*/
PowerQueue.prototype.processingList = function() {
var self = this;
return self._processList.fetch();
};
/** @method PowerQueue.prototype.isHalted
* @reactive
* @returns {boolean} True if the queue is not running or paused
*/
PowerQueue.prototype.isHalted = function() {
var self = this;
return (!self._running.get() || self._paused.get());
};
/** @method PowerQueue.prototype.length
* @reactive
* @returns {number} Number of tasks left in queue to be processed
*/
PowerQueue.prototype.length = function() {
var self = this;
return self.invocations.length();
};
/** @method PowerQueue.prototype.progress
* @reactive
* @returns {number} 0 .. 100 % Indicates the status of the queue
*/
PowerQueue.prototype.progress = function() {
var self = this;
var progress = self._maxLength.get() - self.invocations.length() - self._isProcessing.get();
if (self._maxLength.value > 0) {
return Math.round(progress / self._maxLength.value * 100);
}
return 0;
};
/** @method PowerQueue.prototype.usage
* @reactive
* @returns {number} 0 .. 100 % Indicates resource usage of the queue
*/
PowerQueue.prototype.usage = function() {
var self = this;
return Math.round(self._isProcessing.get() / self._maxProcessing.get() * 100);
};
/** @method PowerQueue.prototype.reset Reset the queue
* Calling this will:
* * stop the queue
* * paused to false
* * Discart all queue data
*
* > NOTE: At the moment if the queue has processing tasks they can change
* > the `errors` and `failures` counters. This could change in the future or
* > be prevented by creating a whole new instance of the `PowerQueue`
*/
PowerQueue.prototype.reset = function() {
var self = this;
self.debug && console.log(self.title + ' RESET');
self._running.set(false);
self._paused.set(false);
self.invocations.reset();
self._processList.reset();
// // Loop through the processing tasks and reset these
// self._processList.forEach(function(data) {
// if (data.queue instanceof PowerQueue) {
// data.queue.reset();
// }
// }, true);
self._maxLength.set(0);
self._failures.set(0);
self._errors.set(0);
};
/** @method PowerQueue._autoStartTasks
* @private
*
* This method defines the autostart algorithm that allows add task to trigger
* a start of the queue if queue is not paused.
*/
PowerQueue.prototype._autoStartTasks = function() {
var self = this;
// We dont start anything by ourselfs if queue is paused
if (!self._paused.value) {
// Queue is not running and we are set to autostart so we start the queue
if (!self._running.value && self._autostart.value) {
// Trigger callback / event
self.onAutostart();
// Set queue as running
self._running.set(true);
}
// Make sure that we use all available resources
if (self._running.value) {
// Call next to start up the queue
self.next(null);
}
}
};
/** @method PowerQueue.prototype.add
* @param {any} data The task to be handled
* @param {number} [failures] Used internally to Pass on number of failures.
*/
PowerQueue.prototype.add = function(data, failures, id) {
var self = this;
// Assign new id to task
var assignNewId = self._jumpOnFailure || typeof id === 'undefined';
// Set the task id
var taskId = (assignNewId) ? self._maxLength.value + 1 : id;
// self.invocations.add({ _id: currentId, data: data, failures: failures || 0 }, reversed);
self.invocations.insert(taskId, { _id: taskId, data: data, failures: failures || 0 });
// If we assigned new id then increase length
if (assignNewId) self._maxLength.inc();
self._autoStartTasks();
};
/** @method PowerQueue.prototype.updateThrottleUp
* @private
*
* Calling this method will update the throttle on the queue adding tasks.
*
* > Note: Currently we only support the PowerQueue - but we could support
* > a more general interface for pauseable tasks or other usecases.
*/
PowerQueue.prototype.updateThrottleUp = function() {
var self = this;
// How many additional tasks can we handle?
var availableSlots = self._maxProcessing.value - self._isProcessing.value;
// If we can handle more, we have more, we're running, and we're not paused
if (!self._paused.value && self._running.value && availableSlots > 0 && self.invocations._length > 0) {
// Increase counter of current number of tasks being processed
self._isProcessing.inc();
// Run task
self.runTask(self.invocations.getFirstItem());
// Repeat recursively; this is better than a for loop to avoid blocking the UI
self.updateThrottleUp();
}
};
/** @method PowerQueue.prototype.updateThrottleDown
* @private
*
* Calling this method will update the throttle on the queue pause tasks.
*
* > Note: Currently we only support the PowerQueue - but we could support
* > a more general interface for pauseable tasks or other usecases.
*/
PowerQueue.prototype.updateThrottleDown = function() {
var self = this;
// Calculate the differece between acutuall processing tasks and target
var diff = self._isProcessing.value - self._maxProcessing.value;
// If the diff is more than 0 then we have many tasks processing.
if (diff > 0) {
// We pause the latest added tasks
self._processList.forEachReverse(function(data) {
if (diff > 0 && data.queue instanceof PowerQueue) {
diff--;
// We dont mind calling pause on multiple times on each task
// theres a simple check going on preventing any duplicate actions
data.queue.pause();
}
}, true);
}
};
/** @method PowerQueue.prototype.next
* @param {string} [err] Error message if task failed
* > * Can pass in `null` to start the queue
* > * Passing in a string to `next` will trigger a failure
* > * Passing nothing will simply let the next task run
* `next` is handed into the [taskHandler](PowerQueue.taskHandler) as a
* callback to mark an error or end of current task
*/
PowerQueue.prototype.next = function(err) {
var self = this;
// Primary concern is to throttle up because we are either:
// 1. Starting the queue
// 2. Starting next task
//
// This function does not shut down running tasks
self.updateThrottleUp();
// We are running, no tasks are being processed even we just updated the
// throttle up and we got no errors.
// 1. We are paused and releasing tasks
// 2. We are done
if (self._running.value && self._isProcessing.value === 0 && err !== null) {
// We have no tasks processing so this queue is now releasing resources
// this could be that the queue is paused or stopped, in that case the
// self.invocations._length would be > 0
// If on the other hand the self.invocations._length is 0 then we have no more
// tasks in the queue so the queue has ended
self.onRelease(self.invocations._length);
if (!self.invocations._length) { // !self._paused.value &&
// Check if queue is done working
// Stop the queue
self._running.set(false);
// self.invocations.reset(); // This should be implicit
self.onEnded();
}
}
};
/** @callback done
* @param {Meteor.Error | Error | String | null} [feedback] This allows the task to communicate with the queue
*
* Explaination of `feedback`
* * `Meteor.Error` This means that the task failed in a controlled manner and is allowed to rerun
* * `Error` This will throw the passed error - as its an unitended error
* * `null` The task is not done yet, rerun later
* * `String` The task can perform certain commands on the queue
* * "pause" - pause the queue
* * "stop" - stop the queue
* * "reset" - reset the queue
* * "cancel" - cancel the queue
*
*/
/** @method PowerQueue.prototype.runTaskDone
* @private
* @param {Meteor.Error | Error | String | null} [feedback] This allows the task to communicate with the queue
* @param {object} invocation
*
* > Note: `feedback` is explained in [Done callback](#done)
*
*/
// Rig the callback function
PowerQueue.prototype.runTaskDone = function(feedback, invocation) {
var self = this;
// If the task handler throws an error then add it to the queue again
// we allow this for a max of self._maxFailures
// If the error is null then we add the task silently back into the
// microQueue in reverse... This could be due to pause or throttling
if (feedback instanceof Meteor.Error) {
// We only count failures if maxFailures are above 0
if (self._maxFailures.value > 0) invocation.failures++;
self._failures.inc();
// If the user has set the debug flag we print out failures/errors
self.debug && console.error('Error: "' + self.title + '" ' + feedback.message + ', ' + feedback.stack);
if (invocation.failures < self._maxFailures.value) {
// Add the task again with the increased failures
self.add(invocation.data, invocation.failures, invocation._id);
} else {
self._errors.inc();
self.errorHandler(invocation.data, self.add, invocation.failures);
}
// If a error is thrown we assume its not intended
} else if (feedback instanceof Error) throw feedback;
if (feedback)
// We use null to throttle pauseable tasks
if (feedback === null) {
// We add this task into the queue, no questions asked
self.invocations.insert(invocation._id, { data: invocation.data, failures: invocation.failures, _id: invocation._id });
}
// If the user returns a string we got a command
if (feedback === ''+feedback) {
var command = {
'pause': function() { self.pause(); },
'stop': function() { self.stop(); },
'reset': function() { self.reset(); },
'cancel': function() { self.cancel(); },
};
if (typeof command[feedback] === 'function') {
// Run the command on this queue
command[feedback]();
} else {
// We dont recognize this command, throw an error
throw new Error('Unknown queue command "' + feedback + '"');
}
}
// Decrease the number of tasks being processed
// make sure we dont go below 0
if (self._isProcessing.value > 0) self._isProcessing.dec();
// Task has ended we remove the task from the process list
self._processList.remove(invocation._id);
invocation.data = null;
invocation.failures = null;
invocation._id = null;
invocation = null;
delete invocation;
// Next task
Meteor.setTimeout(function() {
self.next();
}, 0);
};
/** @method PowerQueue.prototype.runTask
* @private // This is not part of the open api
* @param {object} invocation The object stored in the micro-queue
*/
PowerQueue.prototype.runTask = function(invocation) {
var self = this;
// We start the fitting task handler
// Currently we only support the PowerQueue but we could have a more general
// interface for tasks that allow throttling
try {
if (invocation.data instanceof PowerQueue) {
// Insert PowerQueue into process list
self._processList.insert(invocation._id, { id: invocation._id, queue: invocation.data });
// Handle task
self.queueTaskHandler(invocation.data, function subQueueCallbackDone(feedback) {
self.runTaskDone(feedback, invocation);
}, invocation.failures);
} else {
// Insert task into process list
self._processList.insert(invocation._id, invocation.data);
// Handle task
self.taskHandler(invocation.data, function taskCallbackDone(feedback) {
self.runTaskDone(feedback, invocation);
}, invocation.failures);
}
} catch(err) {
throw new Error('Error while running taskHandler for queue, Error: ' + err.message);
}
};
/** @method PowerQueue.prototype.queueTaskHandler
* This method handles tasks that are sub queues
*/
PowerQueue.prototype.queueTaskHandler = function(subQueue, next, failures) {
var self = this;
// Monitor sub queue task releases
subQueue.onRelease = function(remaining) {
// Ok, we were paused - this could be throttling so we respect this
// So when the queue is halted we add it back into the main queue
if (remaining > 0) {
// We get out of the queue but dont repport error and add to run later
next(null);
} else {
// Queue has ended
// We simply trigger next task when the sub queue is complete
next();
// When running subqueues it doesnt make sense to track failures and retry
// the sub queue - this is sub queue domain
}
};
// Start the queue
subQueue.run();
};
/** @callback PowerQueue.prototype.taskHandler
* @param {any} data This can be data or functions
* @param {function} next Function `next` call this to end task
* @param {number} failures Number of failures on this task
*
* Default task handler expects functions as data:
* ```js
* self.taskHandler = function(data, next, failures) {
* // This default task handler expects invocation to be a function to run
* if (typeof data !== 'function') {
* throw new Error('Default task handler expects a function');
* }
* try {
* // Have the function call next
* data(next, failures);
* } catch(err) {
* // Throw to fail this task
* next(err);
* }
* };
* ```
*/
// Can be overwrittin by the user
PowerQueue.prototype.taskHandler = function(data, next, failures) {
var self = this;
// This default task handler expects invocation to be a function to run
if (typeof data !== 'function') {
throw new Error('Default task handler expects a function');
}
try {
// Have the function call next
data(next, failures);
} catch(err) {
// Throw to fail this task
next(err);
}
};
/** @callback PowerQueue.prototype.errorHandler
* @param {any} data This can be data or functions
* @param {function} addTask Use this function to insert the data into the queue again
* @param {number} failures Number of failures on this task
*
* The default callback:
* ```js
* var foo = new PowerQueue();
*
* // Overwrite the default action
* foo.errorHandler = function(data, addTask, failures) {
* // This could be overwritten the data contains the task data and addTask
* // is a helper for adding the task to the queue
* // try again: addTask(data);
* // console.log('Terminate at ' + failures + ' failures');
* };
* ```
*/
PowerQueue.prototype.errorHandler = function(data, addTask, failures) {
var self = this;
// This could be overwritten the data contains the task data and addTask
// is a helper for adding the task to the queue
// try again: addTask(data);
self.debug && console.log('Terminate at ' + failures + ' failures');
};
/** @method PowerQueue.prototype.pause Pause the queue
* @todo We should have it pause all processing tasks
*/
PowerQueue.prototype.pause = function() {
var self = this;
if (!self._paused.value) {
self._paused.set(true);
// Loop through the processing tasks and pause these
self._processList.forEach(function(data) {
if (data.queue instanceof PowerQueue) {
// Pause the sub queue
data.queue.pause();
}
}, true);
// Trigger callback
self.onPaused();
}
};
/** @method PowerQueue.prototype.resume Start a paused queue
* @todo We should have it resume all processing tasks
*
* > This will not start a stopped queue
*/
PowerQueue.prototype.resume = function() {
var self = this;
self.run();
};
/** @method PowerQueue.prototype.run Starts the queue
* > Using this command will resume a paused queue and will
* > start a stopped queue.
*/
PowerQueue.prototype.run = function() {
var self = this;
//not paused and already running or queue empty or paused subqueues
if (!self._paused.value && self._running.value || !self.invocations._length) {
return;
}
self._paused.set(false);
self._running.set(true);
self.next(null);
};
/** @method PowerQueue.prototype.stop Stops the queue
*/
PowerQueue.prototype.stop = function() {
var self = this;
self._running.set(false);
};
/** @method PowerQueue.prototype.cancel Cancel the queue
*/
PowerQueue.prototype.cancel = function() {
var self = this;
self.reset();
};