diff --git a/app/apps/server/bridges/scheduler.js b/app/apps/server/bridges/scheduler.js index f59f61f0640..00327d9483b 100644 --- a/app/apps/server/bridges/scheduler.js +++ b/app/apps/server/bridges/scheduler.js @@ -6,6 +6,10 @@ function _callProcessor(processor) { return (job) => processor(job?.attrs?.data || {}); } +/** + * Provides the Apps Engine with task scheduling capabilities + * It uses {@link agenda:github.com/agenda/agenda} as backend + */ export class AppSchedulerBridge { constructor(orch) { this.orch = orch; @@ -18,6 +22,34 @@ export class AppSchedulerBridge { this.isConnected = false; } + /** + * Entity that will be run in a job + * @typedef {Object} Processor + * @property {string} id The processor's identifier + * @property {function} processor The function that will be run on a given schedule + * @property {IOnetimeStartup|IRecurrentStartup} [startupSetting] If provided, the processor will be configured with the setting as soon as it gets registered + + * Processor setting for running once after being registered + * @typedef {Object} IOnetimeStartup + * @property {string} type=onetime + * @property {string} when When the processor will be executed + * @property {Object} [data] An optional object that is passed to the processor + * + * Processor setting for running recurringly after being registered + * @typedef {Object} IRecurrentStartup + * @property {string} type=recurring + * @property {string} interval When the processor will be re executed + * @property {Object} [data] An optional object that is passed to the processor + */ + + /** + * Register processors that can be scheduled to run + * + * @param {Array.} processors An array of processors + * @param {string} appId + * + * @returns Promise + */ async registerProcessors(processors = [], appId) { const runAfterRegister = []; this.orch.debugLog(`The App ${ appId } is registering job processors`, processors); @@ -30,9 +62,10 @@ export class AppSchedulerBridge { runAfterRegister.push(this.scheduleOnceAfterRegister({ id, when: startupSetting.when, data: startupSetting.data }, appId)); break; case StartupType.RECURRING: - runAfterRegister.push(this.scheduleRecurring({ id, interval: startupSetting.interval, data: startupSetting.data }, appId)); + runAfterRegister.push(this.scheduleRecurring({ id, interval: startupSetting.interval, skipImmediate: startupSetting.skipImmediate, data: startupSetting.data }, appId)); break; default: + this.orch.getRocketChatLogger().error(`Invalid startup setting type (${ startupSetting.type }) for the processor ${ id }`); break; } } @@ -43,10 +76,25 @@ export class AppSchedulerBridge { } } + /** + * Schedules a registered processor to run _once_. + * + * @param {Object} job + * @param {string} job.id The processor's id + * @param {string} job.when When the processor will be executed + * @param {Object} [job.data] An optional object that is passed to the processor + * @param {string} appId + * + * @returns Promise + */ async scheduleOnce(job, appId) { this.orch.debugLog(`The App ${ appId } is scheduling an onetime job`, job); - await this.startScheduler(); - await this.scheduler.schedule(job.when, job.id, job.data || {}); + try { + await this.startScheduler(); + await this.scheduler.schedule(job.when, job.id, job.data || {}); + } catch (e) { + this.orch.getRocketChatLogger().error(e); + } } async scheduleOnceAfterRegister(job, appId) { @@ -56,22 +104,55 @@ export class AppSchedulerBridge { } } - async scheduleRecurring(job, appId) { - this.orch.debugLog(`The App ${ appId } is scheduling a recurring job`, job); - await this.startScheduler(); - await this.scheduler.every(job.interval, job.id, job.data || {}); + /** + * Schedules a registered processor to run recurrently according to a given interval + * + * @param {Object} job + * @param {string} job.id The processor's id + * @param {string} job.interval When the processor will be re executed + * @param {boolean} job.skipImmediate=false Whether to let the first iteration to execute as soon as the task is registered + * @param {Object} [job.data] An optional object that is passed to the processor + * @param {string} appId + * + * @returns Promise + */ + async scheduleRecurring({ id, interval, skipImmediate = false, data }, appId) { + this.orch.debugLog(`The App ${ appId } is scheduling a recurring job`, id); + try { + await this.startScheduler(); + const job = this.scheduler.create(id, data || {}); + job.repeatEvery(interval, { skipImmediate }); + await job.save(); + } catch (e) { + this.orch.getRocketChatLogger().error(e); + } } + /** + * Cancels a running job given its jobId + * + * @param {string} jobId + * @param {string} appId + * + * @returns Promise + */ async cancelJob(jobId, appId) { this.orch.debugLog(`The App ${ appId } is canceling a job`, jobId); await this.startScheduler(); try { await this.scheduler.cancel({ name: jobId }); } catch (e) { - console.error(e); + this.orch.getRocketChatLogger().error(e); } } + /** + * Cancels all the running jobs from the app + * + * @param {string} appId + * + * @returns Promise + */ async cancelAllJobs(appId) { this.orch.debugLog(`Canceling all jobs of App ${ appId }`); await this.startScheduler(); @@ -79,7 +160,7 @@ export class AppSchedulerBridge { try { await this.scheduler.cancel({ name: { $regex: matcher } }); } catch (e) { - console.error(e); + this.orch.getRocketChatLogger().error(e); } }