@ -1,4 +1,5 @@
import Agenda from 'agenda' ;
import { ObjectID } from 'bson' ;
import { MongoInternals } from 'meteor/mongo' ;
import {
StartupType ,
@ -10,13 +11,15 @@ import { SchedulerBridge } from '@rocket.chat/apps-engine/server/bridges/Schedul
import { AppServerOrchestrator } from '../orchestrator' ;
function _callProcessor ( processor : Function ) : ( job : { attrs ? : { data : object } } ) = > void {
function _callProcessor ( processor : Function ) : ( job : Agenda.Job ) = > void {
return ( job ) : void = > {
const data = job ? . attrs ? . data || { } ;
// This field is for internal use, no need to leak to app processor
delete ( data as any ) . appId ;
data . jobId = job . attrs . _id . toString ( ) ;
return processor ( data ) ;
} ;
}
@ -68,10 +71,10 @@ export class AppSchedulerBridge extends SchedulerBridge {
* @param { Array . < Processor > } processors An array of processors
* @param { string } appId
*
* @returns Promise < void >
* @returns { string [ ] } List of task ids run at startup , or void no startup run is set
* /
protected async registerProcessors ( processors : Array < IProcessor > = [ ] , appId : string ) : Promise < void > {
const runAfterRegister : Promise < void > [ ] = [ ] ;
protected async registerProcessors ( processors : Array < IProcessor > = [ ] , appId : string ) : Promise < void | Array < string > > {
const runAfterRegister : Promise < string > [ ] = [ ] ;
this . orch . debugLog ( ` The App ${ appId } is registering job processors ` , processors ) ;
processors . forEach ( ( { id , processor , startupSetting } : IProcessor ) = > {
this . scheduler . define ( id , _callProcessor ( processor ) ) ;
@ -82,10 +85,10 @@ export class AppSchedulerBridge extends SchedulerBridge {
switch ( startupSetting . type ) {
case StartupType . ONETIME :
runAfterRegister . push ( this . scheduleOnceAfterRegister ( { id , when : startupSetting.when , data : startupSetting.data } , appId ) ) ;
runAfterRegister . push ( this . scheduleOnceAfterRegister ( { id , when : startupSetting.when , data : startupSetting.data } , appId ) as Promise < string > ) ;
break ;
case StartupType . RECURRING :
runAfterRegister . push ( this . scheduleRecurring ( { id , interval : startupSetting.interval , skipImmediate : startupSetting.skipImmediate , data : startupSetting.data } , appId ) ) ;
runAfterRegister . push ( this . scheduleRecurring ( { id , interval : startupSetting.interval , skipImmediate : startupSetting.skipImmediate , data : startupSetting.data } , appId ) as Promise < string > ) ;
break ;
default :
this . orch . getRocketChatLogger ( ) . error ( ` Invalid startup setting type ( ${ String ( ( startupSetting as any ) . type ) } ) for the processor ${ id } ` ) ;
@ -94,7 +97,7 @@ export class AppSchedulerBridge extends SchedulerBridge {
} ) ;
if ( runAfterRegister . length ) {
await Promise . all ( runAfterRegister ) ;
return Promise . all ( runAfterRegister ) as Promise < Array < string > > ;
}
}
@ -107,22 +110,23 @@ export class AppSchedulerBridge extends SchedulerBridge {
* @param { Object } [ job . data ] An optional object that is passed to the processor
* @param { string } appId
*
* @returns Promise < void >
* @returns { string } taskid
* /
protected async scheduleOnce ( job : IOnetimeSchedule , appId : string ) : Promise < void > {
this . orch . debugLog ( ` The App ${ appId } is scheduling an onetime job ` , job ) ;
protected async scheduleOnce ( { id , when , data } : IOnetimeSchedule , appId : string ) : Promise < void | string > {
this . orch . debugLog ( ` The App ${ appId } is scheduling an onetime job (processor ${ id } ) ` ) ;
try {
await this . startScheduler ( ) ;
await this . scheduler . schedule ( job . when , job . id , this . decorateJobData ( job . data , appId ) ) ;
const job = await this . scheduler . schedule ( when , id , this . decorateJobData ( data , appId ) ) ;
return job . attrs . _id . toString ( ) ;
} catch ( e ) {
this . orch . getRocketChatLogger ( ) . error ( e ) ;
}
}
private async scheduleOnceAfterRegister ( job : IOnetimeSchedule , appId : string ) : Promise < void > {
private async scheduleOnceAfterRegister ( job : IOnetimeSchedule , appId : string ) : Promise < void | string > {
const scheduledJobs = await this . scheduler . jobs ( { name : job.id , type : 'normal' } ) ;
if ( ! scheduledJobs . length ) {
await this . scheduleOnce ( job , appId ) ;
return this . scheduleOnce ( job , appId ) ;
}
}
@ -136,13 +140,14 @@ export class AppSchedulerBridge extends SchedulerBridge {
* @param { Object } [ job . data ] An optional object that is passed to the processor
* @param { string } appId
*
* @returns Promise < void >
* @returns { string } taskid
* /
protected async scheduleRecurring ( { id , interval , skipImmediate = false , data } : IRecurringSchedule , appId : string ) : Promise < void > {
this . orch . debugLog ( ` The App ${ appId } is scheduling a recurring job ` , id ) ;
protected async scheduleRecurring ( { id , interval , skipImmediate = false , data } : IRecurringSchedule , appId : string ) : Promise < void | string > {
this . orch . debugLog ( ` The App ${ appId } is scheduling a recurring job (processor ${ id } ) ` ) ;
try {
await this . startScheduler ( ) ;
await this . scheduler . every ( interval , id , this . decorateJobData ( data , appId ) , { skipImmediate } ) ;
const job = await this . scheduler . every ( interval , id , this . decorateJobData ( data , appId ) , { skipImmediate } ) ;
return job . attrs . _id . toString ( ) ;
} catch ( e ) {
this . orch . getRocketChatLogger ( ) . error ( e ) ;
}
@ -159,8 +164,17 @@ export class AppSchedulerBridge extends SchedulerBridge {
protected async cancelJob ( jobId : string , appId : string ) : Promise < void > {
this . orch . debugLog ( ` The App ${ appId } is canceling a job ` , jobId ) ;
await this . startScheduler ( ) ;
let cancelQuery ;
try {
cancelQuery = { _id : new ObjectID ( jobId . split ( '_' ) [ 0 ] ) } ;
} catch ( jobDocIdError ) {
// it is not a valid objectid, so it won't try to cancel by document id
cancelQuery = { name : jobId } ;
}
try {
await this . scheduler . cancel ( { name : jobId } ) ;
await this . scheduler . cancel ( cancelQuery ) ;
} catch ( e ) {
this . orch . getRocketChatLogger ( ) . error ( e ) ;
}