|
|
|
|
@ -19,6 +19,34 @@ const defaultInterval = 5000; |
|
|
|
|
|
|
|
|
|
type JobSort = Partial<Record<keyof IJob, 1 | -1>>; |
|
|
|
|
|
|
|
|
|
type MongoTopology = { |
|
|
|
|
autoReconnect?: boolean; |
|
|
|
|
connections?(): unknown[]; |
|
|
|
|
isDestroyed?(): boolean; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
type MongoDB = Db & { |
|
|
|
|
s?: { |
|
|
|
|
client?: { |
|
|
|
|
topology?: MongoTopology; |
|
|
|
|
}; |
|
|
|
|
}; |
|
|
|
|
db?: { |
|
|
|
|
s?: { |
|
|
|
|
client?: { |
|
|
|
|
topology?: MongoTopology; |
|
|
|
|
}; |
|
|
|
|
}; |
|
|
|
|
}; |
|
|
|
|
topology?: { |
|
|
|
|
s?: { |
|
|
|
|
options?: { |
|
|
|
|
useUnifiedTopology?: boolean; |
|
|
|
|
}; |
|
|
|
|
}; |
|
|
|
|
}; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
type AgendaConfig = { |
|
|
|
|
name?: string; |
|
|
|
|
processEvery?: string; |
|
|
|
|
@ -31,7 +59,7 @@ type AgendaConfig = { |
|
|
|
|
sort?: JobSort; |
|
|
|
|
} & ( |
|
|
|
|
| { |
|
|
|
|
mongo: Db; |
|
|
|
|
mongo: MongoDB; |
|
|
|
|
db?: { |
|
|
|
|
collection?: string; |
|
|
|
|
}; |
|
|
|
|
@ -46,8 +74,6 @@ type AgendaConfig = { |
|
|
|
|
} |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
type AgendaCallback = (error: unknown, result: unknown) => void; |
|
|
|
|
|
|
|
|
|
export type RepeatOptions = { timezone?: string; skipImmediate?: boolean }; |
|
|
|
|
|
|
|
|
|
export class Agenda extends EventEmitter { |
|
|
|
|
@ -63,14 +89,11 @@ export class Agenda extends EventEmitter { |
|
|
|
|
|
|
|
|
|
private _defaultLockLifetime: number; |
|
|
|
|
|
|
|
|
|
// @ts-ignore
|
|
|
|
|
private _db: MongoClient; |
|
|
|
|
protected _db: MongoClient | undefined; |
|
|
|
|
|
|
|
|
|
// @ts-ignore
|
|
|
|
|
private _mdb: Db; |
|
|
|
|
private _mdb: MongoDB | undefined; |
|
|
|
|
|
|
|
|
|
// @ts-ignore
|
|
|
|
|
private _collection: Collection; |
|
|
|
|
private _collection: Collection | undefined; |
|
|
|
|
|
|
|
|
|
private _definitions: Record<string, JobDefinition> = {}; |
|
|
|
|
|
|
|
|
|
@ -98,7 +121,7 @@ export class Agenda extends EventEmitter { |
|
|
|
|
|
|
|
|
|
private _mongoUseUnifiedTopology: boolean | undefined; |
|
|
|
|
|
|
|
|
|
constructor(config: AgendaConfig = {}, cb?: AgendaCallback) { |
|
|
|
|
constructor(config: AgendaConfig = {}) { |
|
|
|
|
super(); |
|
|
|
|
|
|
|
|
|
this._name = config.name; |
|
|
|
|
@ -121,21 +144,18 @@ export class Agenda extends EventEmitter { |
|
|
|
|
this._ready = new Promise((resolve) => this.once('ready', resolve)); |
|
|
|
|
|
|
|
|
|
if (config.mongo) { |
|
|
|
|
this.mongo(config.mongo, config.db ? config.db.collection : undefined, cb); |
|
|
|
|
// @ts-ignore
|
|
|
|
|
if (config.mongo.s && config.mongo.topology && config.mongo.topology.s) { |
|
|
|
|
// @ts-ignore
|
|
|
|
|
this._mongoUseUnifiedTopology = Boolean(config.mongo?.topology?.s?.options?.useUnifiedTopology); |
|
|
|
|
} |
|
|
|
|
this.mongo(config.mongo, config.db ? config.db.collection : undefined); |
|
|
|
|
} else if (config.db) { |
|
|
|
|
this.database(config.db.address, config.db.collection, config.db.options, cb); |
|
|
|
|
this.database(config.db.address, config.db.collection, config.db.options); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public mongo(mdb: Db, collection: string | undefined, cb?: AgendaCallback): Agenda { |
|
|
|
|
public mongo(mdb: MongoDB, collection: string | undefined) { |
|
|
|
|
this._mdb = mdb; |
|
|
|
|
this.dbInit(collection, cb); |
|
|
|
|
return this; |
|
|
|
|
if (mdb.s && mdb.topology && mdb.topology.s) { |
|
|
|
|
this._mongoUseUnifiedTopology = Boolean(mdb?.topology?.s?.options?.useUnifiedTopology); |
|
|
|
|
} |
|
|
|
|
return this.dbInit(collection); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
@ -146,7 +166,7 @@ export class Agenda extends EventEmitter { |
|
|
|
|
* or use Agenda.mongo(). If your app already has a MongoDB connection then use that. ie. specify config.mongo in |
|
|
|
|
* the constructor or use Agenda.mongo(). |
|
|
|
|
*/ |
|
|
|
|
public database(url: string, collection: string | undefined, options: MongoClientOptions = {}, cb?: AgendaCallback): Agenda { |
|
|
|
|
public async database(url: string, collection: string | undefined, options: MongoClientOptions = {}) { |
|
|
|
|
if (!hasMongoProtocol(url)) { |
|
|
|
|
url = `mongodb://${url}`; |
|
|
|
|
} |
|
|
|
|
@ -157,43 +177,30 @@ export class Agenda extends EventEmitter { |
|
|
|
|
...options, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
MongoClient.connect(url, options, (error, client) => { |
|
|
|
|
if (error || !client) { |
|
|
|
|
debug('error connecting to MongoDB using collection: [%s]', collection); |
|
|
|
|
if (cb) { |
|
|
|
|
cb(error, null); |
|
|
|
|
} else { |
|
|
|
|
throw error; |
|
|
|
|
} |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
const client = await MongoClient.connect(url, options); |
|
|
|
|
debug('successful connection to MongoDB using collection: [%s]', collection); |
|
|
|
|
this._db = client; |
|
|
|
|
this._mdb = client.db(); |
|
|
|
|
this.dbInit(collection, cb); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
return this; |
|
|
|
|
this.dbInit(collection); |
|
|
|
|
} catch (error) { |
|
|
|
|
debug('error connecting to MongoDB using collection: [%s]', collection); |
|
|
|
|
return error; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public dbInit(collection: string | undefined, cb?: AgendaCallback): void { |
|
|
|
|
public async dbInit(collection: string | undefined) { |
|
|
|
|
debug('init database collection using name [%s]', collection); |
|
|
|
|
this._collection = this._mdb.collection(collection || 'agendaJobs'); |
|
|
|
|
this._collection = this.getMongoDB().collection(collection || 'agendaJobs'); |
|
|
|
|
debug('attempting index creation'); |
|
|
|
|
this._collection.createIndex(this._indexes, { name: 'findAndLockNextJobIndex' }, (err) => { |
|
|
|
|
if (err) { |
|
|
|
|
debug('index creation failed'); |
|
|
|
|
this.emit('error', err); |
|
|
|
|
} else { |
|
|
|
|
debug('index creation success'); |
|
|
|
|
this.emit('ready'); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (cb) { |
|
|
|
|
cb(err, this._collection); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
try { |
|
|
|
|
await this._collection.createIndex(this._indexes, { name: 'findAndLockNextJobIndex' }); |
|
|
|
|
debug('index creation success'); |
|
|
|
|
this.emit('ready'); |
|
|
|
|
} catch (err) { |
|
|
|
|
debug('index creation failed'); |
|
|
|
|
this.emit('error', err); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public name(name: string): Agenda { |
|
|
|
|
@ -257,8 +264,24 @@ export class Agenda extends EventEmitter { |
|
|
|
|
return job; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private getCollection(): Collection { |
|
|
|
|
if (!this._collection) { |
|
|
|
|
throw new Error('Agenda instance is not ready yet'); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return this._collection; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private getMongoDB(): MongoDB { |
|
|
|
|
if (!this._mdb) { |
|
|
|
|
throw new Error('Agenda instance is not ready yet'); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return this._mdb; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public async jobs(query = {}, sort = {}, limit = 0, skip = 0): Promise<Job[]> { |
|
|
|
|
const result = await this._collection.find<IJob>(query).sort(sort).limit(limit).skip(skip).toArray(); |
|
|
|
|
const result = await this.getCollection().find<IJob>(query).sort(sort).limit(limit).skip(skip).toArray(); |
|
|
|
|
|
|
|
|
|
return result.map((job) => createJob(this, job)); |
|
|
|
|
} |
|
|
|
|
@ -390,7 +413,7 @@ export class Agenda extends EventEmitter { |
|
|
|
|
public async cancel(query: Record<string, any>): Promise<number> { |
|
|
|
|
debug('attempting to cancel all Agenda jobs', query); |
|
|
|
|
try { |
|
|
|
|
const { deletedCount } = await this._collection.deleteMany(query); |
|
|
|
|
const { deletedCount } = await this.getCollection().deleteMany(query); |
|
|
|
|
debug('%s jobs cancelled', deletedCount || 0); |
|
|
|
|
return deletedCount || 0; |
|
|
|
|
} catch (error) { |
|
|
|
|
@ -401,7 +424,7 @@ export class Agenda extends EventEmitter { |
|
|
|
|
|
|
|
|
|
public async has(query: Record<string, any>): Promise<boolean> { |
|
|
|
|
debug('checking whether Agenda has any jobs matching query', query); |
|
|
|
|
const record = await this._collection.findOne(query, { projection: { _id: 1 } }); |
|
|
|
|
const record = await this.getCollection().findOne(query, { projection: { _id: 1 } }); |
|
|
|
|
return record !== null; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -416,7 +439,7 @@ export class Agenda extends EventEmitter { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if ('insertedId' in result) { |
|
|
|
|
return this._collection.findOne({ _id: result.insertedId }); |
|
|
|
|
return this.getCollection().findOne({ _id: result.insertedId }); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return null; |
|
|
|
|
@ -445,7 +468,7 @@ export class Agenda extends EventEmitter { |
|
|
|
|
|
|
|
|
|
// Update the job and process the resulting data'
|
|
|
|
|
debug('job already has _id, calling findOneAndUpdate() using _id as query'); |
|
|
|
|
const result = await this._collection.findOneAndUpdate({ _id: id }, update, { returnDocument: 'after' }); |
|
|
|
|
const result = await this.getCollection().findOneAndUpdate({ _id: id }, update, { returnDocument: 'after' }); |
|
|
|
|
|
|
|
|
|
return this._processDbResult(job, result); |
|
|
|
|
} |
|
|
|
|
@ -471,7 +494,7 @@ export class Agenda extends EventEmitter { |
|
|
|
|
|
|
|
|
|
// Try an upsert
|
|
|
|
|
debug('calling findOneAndUpdate() with job name and type of "single" as query'); |
|
|
|
|
const result = await this._collection.findOneAndUpdate( |
|
|
|
|
const result = await this.getCollection().findOneAndUpdate( |
|
|
|
|
{ |
|
|
|
|
name: props.name, |
|
|
|
|
type: 'single', |
|
|
|
|
@ -498,14 +521,14 @@ export class Agenda extends EventEmitter { |
|
|
|
|
|
|
|
|
|
// Use the 'unique' query object to find an existing job or create a new one
|
|
|
|
|
debug('calling findOneAndUpdate() with unique object as query: \n%O', query); |
|
|
|
|
const result = await this._collection.findOneAndUpdate(query, update, { upsert: true, returnDocument: 'after' }); |
|
|
|
|
const result = await this.getCollection().findOneAndUpdate(query, update, { upsert: true, returnDocument: 'after' }); |
|
|
|
|
return this._processDbResult(job, result); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private async _saveNewJob(job: Job, props: Record<string, any>): Promise<void> { |
|
|
|
|
// If all else fails, the job does not exist yet so we just insert it into MongoDB
|
|
|
|
|
debug('using default behavior, inserting new job via insertOne() with props that were set: \n%O', props); |
|
|
|
|
const result = await this._collection.insertOne(props); |
|
|
|
|
const result = await this.getCollection().insertOne(props); |
|
|
|
|
return this._processDbResult(job, result); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -562,26 +585,18 @@ export class Agenda extends EventEmitter { |
|
|
|
|
process.nextTick(() => this.processJobs()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private _unlockJobs(): Promise<void> { |
|
|
|
|
return new Promise((resolve, reject) => { |
|
|
|
|
debug('Agenda._unlockJobs()'); |
|
|
|
|
const jobIds = this._lockedJobs.map((job) => job.attrs._id); |
|
|
|
|
private async _unlockJobs(): Promise<void> { |
|
|
|
|
debug('Agenda._unlockJobs()'); |
|
|
|
|
const jobIds = this._lockedJobs.map((job) => job.attrs._id); |
|
|
|
|
|
|
|
|
|
if (jobIds.length === 0) { |
|
|
|
|
debug('no jobs to unlock'); |
|
|
|
|
return resolve(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
debug('about to unlock jobs with ids: %O', jobIds); |
|
|
|
|
this._collection.updateMany({ _id: { $in: jobIds } }, { $set: { lockedAt: null } }, (err) => { |
|
|
|
|
if (err) { |
|
|
|
|
return reject(err); |
|
|
|
|
} |
|
|
|
|
if (jobIds.length === 0) { |
|
|
|
|
debug('no jobs to unlock'); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
this._lockedJobs = []; |
|
|
|
|
return resolve(); |
|
|
|
|
}); |
|
|
|
|
}); |
|
|
|
|
debug('about to unlock jobs with ids: %O', jobIds); |
|
|
|
|
await this.getCollection().updateMany({ _id: { $in: jobIds } }, { $set: { lockedAt: null } }); |
|
|
|
|
this._lockedJobs = []; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public stop(): Promise<void> { |
|
|
|
|
@ -602,16 +617,15 @@ export class Agenda extends EventEmitter { |
|
|
|
|
debug('_findAndLockNextJob(%s, [Function])', jobName); |
|
|
|
|
|
|
|
|
|
// Don't try and access MongoDB if we've lost connection to it.
|
|
|
|
|
// @ts-ignore
|
|
|
|
|
const s = this._mdb.s.client || this._mdb.db.s.client; |
|
|
|
|
if (s.topology.connections && s.topology.connections().length === 0 && !this._mongoUseUnifiedTopology) { |
|
|
|
|
if (s.topology.autoReconnect && !s.topology.isDestroyed()) { |
|
|
|
|
const client = (this.getMongoDB() || this.getMongoDB().db)?.s?.client; |
|
|
|
|
if (client?.topology?.connections?.().length === 0 && !this._mongoUseUnifiedTopology) { |
|
|
|
|
if (client.topology.autoReconnect && !client.topology.isDestroyed?.()) { |
|
|
|
|
// Continue processing but notify that Agenda has lost the connection
|
|
|
|
|
debug('Missing MongoDB connection, not attempting to find and lock a job'); |
|
|
|
|
this.emit('error', new Error('Lost MongoDB connection')); |
|
|
|
|
} else { |
|
|
|
|
// No longer recoverable
|
|
|
|
|
debug('topology.autoReconnect: %s, topology.isDestroyed(): %s', s.topology.autoReconnect, s.topology.isDestroyed()); |
|
|
|
|
debug('topology.autoReconnect: %s, topology.isDestroyed(): %s', client.topology.autoReconnect, client.topology.isDestroyed?.()); |
|
|
|
|
throw new Error('MongoDB connection is not recoverable, application restart required'); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
@ -642,7 +656,7 @@ export class Agenda extends EventEmitter { |
|
|
|
|
const JOB_PROCESS_SET_QUERY = { $set: { lockedAt: now } }; |
|
|
|
|
|
|
|
|
|
// Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed
|
|
|
|
|
const result = await this._collection.findOneAndUpdate(JOB_PROCESS_WHERE_QUERY, JOB_PROCESS_SET_QUERY, { |
|
|
|
|
const result = await this.getCollection().findOneAndUpdate(JOB_PROCESS_WHERE_QUERY, JOB_PROCESS_SET_QUERY, { |
|
|
|
|
returnDocument: 'after', |
|
|
|
|
sort: this._sort, |
|
|
|
|
}); |
|
|
|
|
@ -734,7 +748,7 @@ export class Agenda extends EventEmitter { |
|
|
|
|
const update = { $set: { lockedAt: now } }; |
|
|
|
|
|
|
|
|
|
// Lock the job in MongoDB!
|
|
|
|
|
const resp = await this._collection.findOneAndUpdate(criteria, update, { returnDocument: 'after' }); |
|
|
|
|
const resp = await this.getCollection().findOneAndUpdate(criteria, update, { returnDocument: 'after' }); |
|
|
|
|
|
|
|
|
|
if (resp.value) { |
|
|
|
|
const job = createJob(this, resp.value as unknown as IJob); |
|
|
|
|
|