diff --git a/app/apps/server/bridges/scheduler.ts b/app/apps/server/bridges/scheduler.ts index 0676178c84d..698931a25be 100644 --- a/app/apps/server/bridges/scheduler.ts +++ b/app/apps/server/bridges/scheduler.ts @@ -1,4 +1,5 @@ import Agenda from 'agenda'; +import { ObjectID } from 'bson'; import { MongoInternals } from 'meteor/mongo'; import { StartupType, @@ -10,13 +11,15 @@ import { SchedulerBridge } from '@rocket.chat/apps-engine/server/bridges/Schedul import { AppServerOrchestrator } from '../orchestrator'; -function _callProcessor(processor: Function): (job: { attrs?: { data: object } }) => void { +function _callProcessor(processor: Function): (job: Agenda.Job) => void { return (job): void => { const data = job?.attrs?.data || {}; // This field is for internal use, no need to leak to app processor delete (data as any).appId; + data.jobId = job.attrs._id.toString(); + return processor(data); }; } @@ -68,10 +71,10 @@ export class AppSchedulerBridge extends SchedulerBridge { * @param {Array.} processors An array of processors * @param {string} appId * - * @returns Promise + * @returns {string[]} List of task ids run at startup, or void no startup run is set */ - protected async registerProcessors(processors: Array = [], appId: string): Promise { - const runAfterRegister: Promise[] = []; + protected async registerProcessors(processors: Array = [], appId: string): Promise> { + const runAfterRegister: Promise[] = []; this.orch.debugLog(`The App ${ appId } is registering job processors`, processors); processors.forEach(({ id, processor, startupSetting }: IProcessor) => { this.scheduler.define(id, _callProcessor(processor)); @@ -82,10 +85,10 @@ export class AppSchedulerBridge extends SchedulerBridge { switch (startupSetting.type) { case StartupType.ONETIME: - runAfterRegister.push(this.scheduleOnceAfterRegister({ id, when: startupSetting.when, data: startupSetting.data }, appId)); + runAfterRegister.push(this.scheduleOnceAfterRegister({ id, when: startupSetting.when, data: startupSetting.data }, appId) as Promise); break; case StartupType.RECURRING: - runAfterRegister.push(this.scheduleRecurring({ id, interval: startupSetting.interval, skipImmediate: startupSetting.skipImmediate, data: startupSetting.data }, appId)); + runAfterRegister.push(this.scheduleRecurring({ id, interval: startupSetting.interval, skipImmediate: startupSetting.skipImmediate, data: startupSetting.data }, appId) as Promise); break; default: this.orch.getRocketChatLogger().error(`Invalid startup setting type (${ String((startupSetting as any).type) }) for the processor ${ id }`); @@ -94,7 +97,7 @@ export class AppSchedulerBridge extends SchedulerBridge { }); if (runAfterRegister.length) { - await Promise.all(runAfterRegister); + return Promise.all(runAfterRegister) as Promise>; } } @@ -107,22 +110,23 @@ export class AppSchedulerBridge extends SchedulerBridge { * @param {Object} [job.data] An optional object that is passed to the processor * @param {string} appId * - * @returns Promise + * @returns {string} taskid */ - protected async scheduleOnce(job: IOnetimeSchedule, appId: string): Promise { - this.orch.debugLog(`The App ${ appId } is scheduling an onetime job`, job); + protected async scheduleOnce({ id, when, data }: IOnetimeSchedule, appId: string): Promise { + this.orch.debugLog(`The App ${ appId } is scheduling an onetime job (processor ${ id })`); try { await this.startScheduler(); - await this.scheduler.schedule(job.when, job.id, this.decorateJobData(job.data, appId)); + const job = await this.scheduler.schedule(when, id, this.decorateJobData(data, appId)); + return job.attrs._id.toString(); } catch (e) { this.orch.getRocketChatLogger().error(e); } } - private async scheduleOnceAfterRegister(job: IOnetimeSchedule, appId: string): Promise { + private async scheduleOnceAfterRegister(job: IOnetimeSchedule, appId: string): Promise { const scheduledJobs = await this.scheduler.jobs({ name: job.id, type: 'normal' }); if (!scheduledJobs.length) { - await this.scheduleOnce(job, appId); + return this.scheduleOnce(job, appId); } } @@ -136,13 +140,14 @@ export class AppSchedulerBridge extends SchedulerBridge { * @param {Object} [job.data] An optional object that is passed to the processor * @param {string} appId * - * @returns Promise + * @returns {string} taskid */ - protected async scheduleRecurring({ id, interval, skipImmediate = false, data }: IRecurringSchedule, appId: string): Promise { - this.orch.debugLog(`The App ${ appId } is scheduling a recurring job`, id); + protected async scheduleRecurring({ id, interval, skipImmediate = false, data }: IRecurringSchedule, appId: string): Promise { + this.orch.debugLog(`The App ${ appId } is scheduling a recurring job (processor ${ id })`); try { await this.startScheduler(); - await this.scheduler.every(interval, id, this.decorateJobData(data, appId), { skipImmediate }); + const job = await this.scheduler.every(interval, id, this.decorateJobData(data, appId), { skipImmediate }); + return job.attrs._id.toString(); } catch (e) { this.orch.getRocketChatLogger().error(e); } @@ -159,8 +164,17 @@ export class AppSchedulerBridge extends SchedulerBridge { protected async cancelJob(jobId: string, appId: string): Promise { this.orch.debugLog(`The App ${ appId } is canceling a job`, jobId); await this.startScheduler(); + + let cancelQuery; + try { + cancelQuery = { _id: new ObjectID(jobId.split('_')[0]) }; + } catch (jobDocIdError) { + // it is not a valid objectid, so it won't try to cancel by document id + cancelQuery = { name: jobId }; + } + try { - await this.scheduler.cancel({ name: jobId }); + await this.scheduler.cancel(cancelQuery); } catch (e) { this.orch.getRocketChatLogger().error(e); } diff --git a/package-lock.json b/package-lock.json index d7afe2fbff5..a17f1185612 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5152,9 +5152,9 @@ } }, "@rocket.chat/apps-engine": { - "version": "1.27.1", - "resolved": "https://registry.npmjs.org/@rocket.chat/apps-engine/-/apps-engine-1.27.1.tgz", - "integrity": "sha512-cdOfcd83mRILvT6c7nDoIUadGpqnyPMPpiKZa9ZPsERSZmImkZ/UlHlmNMMf/jZ3GQt34vgcY187CUNN7XMZ/g==", + "version": "1.28.0-alpha.5370", + "resolved": "https://registry.npmjs.org/@rocket.chat/apps-engine/-/apps-engine-1.28.0-alpha.5370.tgz", + "integrity": "sha512-OGfjeX8cxPjreG34AioFCtEA6eJRRqoay6WddZ2goFxQjBVWQzwvmuzXKNZt/hclTjRZbBVu0yKscJhlW1ijug==", "requires": { "adm-zip": "^0.4.9", "cryptiles": "^4.1.3", @@ -14393,6 +14393,14 @@ "node-int64": "^0.4.0" } }, + "bson": { + "version": "4.5.1", + "resolved": "https://registry.npmjs.org/bson/-/bson-4.5.1.tgz", + "integrity": "sha512-XqFP74pbTVLyLy5KFxVfTUyRrC1mgOlmu/iXHfXqfCKT59jyP9lwbotGfbN59cHBRbJSamZNkrSopjv+N0SqAA==", + "requires": { + "buffer": "^5.6.0" + } + }, "buffer": { "version": "5.6.0", "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.6.0.tgz", diff --git a/package.json b/package.json index 6eff91fbf30..a32b54d6374 100644 --- a/package.json +++ b/package.json @@ -158,7 +158,7 @@ "@nivo/heatmap": "0.73.0", "@nivo/line": "0.62.0", "@nivo/pie": "0.73.0", - "@rocket.chat/apps-engine": "1.27.1", + "@rocket.chat/apps-engine": "1.28.0-alpha.5370", "@rocket.chat/css-in-js": "^0.29.0", "@rocket.chat/emitter": "^0.29.0", "@rocket.chat/fuselage": "^0.29.0", @@ -185,6 +185,7 @@ "bcrypt": "^5.0.1", "blockstack": "19.3.0", "body-parser": "1.19.0", + "bson": "^4.5.1", "bunyan": "^1.8.15", "busboy": "^0.3.1", "bytebuffer": "5.0.1",