chore(agenda): Changes in Agenda API (#31427)

pull/31337/head
Tasso Evangelista 2 years ago committed by GitHub
parent 0312a6cc72
commit db2551906c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 5
      .changeset/wicked-pumpkins-walk.md
  2. 182
      packages/agenda/src/Agenda.ts

@ -0,0 +1,5 @@
---
'@rocket.chat/agenda': minor
---
Remove `@ts-ignore` directives and unused code from Agenda code

@ -19,6 +19,34 @@ const defaultInterval = 5000;
type JobSort = Partial<Record<keyof IJob, 1 | -1>>;
type MongoTopology = {
autoReconnect?: boolean;
connections?(): unknown[];
isDestroyed?(): boolean;
};
type MongoDB = Db & {
s?: {
client?: {
topology?: MongoTopology;
};
};
db?: {
s?: {
client?: {
topology?: MongoTopology;
};
};
};
topology?: {
s?: {
options?: {
useUnifiedTopology?: boolean;
};
};
};
};
type AgendaConfig = {
name?: string;
processEvery?: string;
@ -31,7 +59,7 @@ type AgendaConfig = {
sort?: JobSort;
} & (
| {
mongo: Db;
mongo: MongoDB;
db?: {
collection?: string;
};
@ -46,8 +74,6 @@ type AgendaConfig = {
}
);
type AgendaCallback = (error: unknown, result: unknown) => void;
export type RepeatOptions = { timezone?: string; skipImmediate?: boolean };
export class Agenda extends EventEmitter {
@ -63,14 +89,11 @@ export class Agenda extends EventEmitter {
private _defaultLockLifetime: number;
// @ts-ignore
private _db: MongoClient;
protected _db: MongoClient | undefined;
// @ts-ignore
private _mdb: Db;
private _mdb: MongoDB | undefined;
// @ts-ignore
private _collection: Collection;
private _collection: Collection | undefined;
private _definitions: Record<string, JobDefinition> = {};
@ -98,7 +121,7 @@ export class Agenda extends EventEmitter {
private _mongoUseUnifiedTopology: boolean | undefined;
constructor(config: AgendaConfig = {}, cb?: AgendaCallback) {
constructor(config: AgendaConfig = {}) {
super();
this._name = config.name;
@ -121,21 +144,18 @@ export class Agenda extends EventEmitter {
this._ready = new Promise((resolve) => this.once('ready', resolve));
if (config.mongo) {
this.mongo(config.mongo, config.db ? config.db.collection : undefined, cb);
// @ts-ignore
if (config.mongo.s && config.mongo.topology && config.mongo.topology.s) {
// @ts-ignore
this._mongoUseUnifiedTopology = Boolean(config.mongo?.topology?.s?.options?.useUnifiedTopology);
}
this.mongo(config.mongo, config.db ? config.db.collection : undefined);
} else if (config.db) {
this.database(config.db.address, config.db.collection, config.db.options, cb);
this.database(config.db.address, config.db.collection, config.db.options);
}
}
public mongo(mdb: Db, collection: string | undefined, cb?: AgendaCallback): Agenda {
public mongo(mdb: MongoDB, collection: string | undefined) {
this._mdb = mdb;
this.dbInit(collection, cb);
return this;
if (mdb.s && mdb.topology && mdb.topology.s) {
this._mongoUseUnifiedTopology = Boolean(mdb?.topology?.s?.options?.useUnifiedTopology);
}
return this.dbInit(collection);
}
/**
@ -146,7 +166,7 @@ export class Agenda extends EventEmitter {
* or use Agenda.mongo(). If your app already has a MongoDB connection then use that. ie. specify config.mongo in
* the constructor or use Agenda.mongo().
*/
public database(url: string, collection: string | undefined, options: MongoClientOptions = {}, cb?: AgendaCallback): Agenda {
public async database(url: string, collection: string | undefined, options: MongoClientOptions = {}) {
if (!hasMongoProtocol(url)) {
url = `mongodb://${url}`;
}
@ -157,43 +177,30 @@ export class Agenda extends EventEmitter {
...options,
};
MongoClient.connect(url, options, (error, client) => {
if (error || !client) {
debug('error connecting to MongoDB using collection: [%s]', collection);
if (cb) {
cb(error, null);
} else {
throw error;
}
return;
}
try {
const client = await MongoClient.connect(url, options);
debug('successful connection to MongoDB using collection: [%s]', collection);
this._db = client;
this._mdb = client.db();
this.dbInit(collection, cb);
});
return this;
this.dbInit(collection);
} catch (error) {
debug('error connecting to MongoDB using collection: [%s]', collection);
return error;
}
}
public dbInit(collection: string | undefined, cb?: AgendaCallback): void {
public async dbInit(collection: string | undefined) {
debug('init database collection using name [%s]', collection);
this._collection = this._mdb.collection(collection || 'agendaJobs');
this._collection = this.getMongoDB().collection(collection || 'agendaJobs');
debug('attempting index creation');
this._collection.createIndex(this._indexes, { name: 'findAndLockNextJobIndex' }, (err) => {
if (err) {
debug('index creation failed');
this.emit('error', err);
} else {
debug('index creation success');
this.emit('ready');
}
if (cb) {
cb(err, this._collection);
}
});
try {
await this._collection.createIndex(this._indexes, { name: 'findAndLockNextJobIndex' });
debug('index creation success');
this.emit('ready');
} catch (err) {
debug('index creation failed');
this.emit('error', err);
}
}
public name(name: string): Agenda {
@ -257,8 +264,24 @@ export class Agenda extends EventEmitter {
return job;
}
private getCollection(): Collection {
if (!this._collection) {
throw new Error('Agenda instance is not ready yet');
}
return this._collection;
}
private getMongoDB(): MongoDB {
if (!this._mdb) {
throw new Error('Agenda instance is not ready yet');
}
return this._mdb;
}
public async jobs(query = {}, sort = {}, limit = 0, skip = 0): Promise<Job[]> {
const result = await this._collection.find<IJob>(query).sort(sort).limit(limit).skip(skip).toArray();
const result = await this.getCollection().find<IJob>(query).sort(sort).limit(limit).skip(skip).toArray();
return result.map((job) => createJob(this, job));
}
@ -390,7 +413,7 @@ export class Agenda extends EventEmitter {
public async cancel(query: Record<string, any>): Promise<number> {
debug('attempting to cancel all Agenda jobs', query);
try {
const { deletedCount } = await this._collection.deleteMany(query);
const { deletedCount } = await this.getCollection().deleteMany(query);
debug('%s jobs cancelled', deletedCount || 0);
return deletedCount || 0;
} catch (error) {
@ -401,7 +424,7 @@ export class Agenda extends EventEmitter {
public async has(query: Record<string, any>): Promise<boolean> {
debug('checking whether Agenda has any jobs matching query', query);
const record = await this._collection.findOne(query, { projection: { _id: 1 } });
const record = await this.getCollection().findOne(query, { projection: { _id: 1 } });
return record !== null;
}
@ -416,7 +439,7 @@ export class Agenda extends EventEmitter {
}
if ('insertedId' in result) {
return this._collection.findOne({ _id: result.insertedId });
return this.getCollection().findOne({ _id: result.insertedId });
}
return null;
@ -445,7 +468,7 @@ export class Agenda extends EventEmitter {
// Update the job and process the resulting data'
debug('job already has _id, calling findOneAndUpdate() using _id as query');
const result = await this._collection.findOneAndUpdate({ _id: id }, update, { returnDocument: 'after' });
const result = await this.getCollection().findOneAndUpdate({ _id: id }, update, { returnDocument: 'after' });
return this._processDbResult(job, result);
}
@ -471,7 +494,7 @@ export class Agenda extends EventEmitter {
// Try an upsert
debug('calling findOneAndUpdate() with job name and type of "single" as query');
const result = await this._collection.findOneAndUpdate(
const result = await this.getCollection().findOneAndUpdate(
{
name: props.name,
type: 'single',
@ -498,14 +521,14 @@ export class Agenda extends EventEmitter {
// Use the 'unique' query object to find an existing job or create a new one
debug('calling findOneAndUpdate() with unique object as query: \n%O', query);
const result = await this._collection.findOneAndUpdate(query, update, { upsert: true, returnDocument: 'after' });
const result = await this.getCollection().findOneAndUpdate(query, update, { upsert: true, returnDocument: 'after' });
return this._processDbResult(job, result);
}
private async _saveNewJob(job: Job, props: Record<string, any>): Promise<void> {
// If all else fails, the job does not exist yet so we just insert it into MongoDB
debug('using default behavior, inserting new job via insertOne() with props that were set: \n%O', props);
const result = await this._collection.insertOne(props);
const result = await this.getCollection().insertOne(props);
return this._processDbResult(job, result);
}
@ -562,26 +585,18 @@ export class Agenda extends EventEmitter {
process.nextTick(() => this.processJobs());
}
private _unlockJobs(): Promise<void> {
return new Promise((resolve, reject) => {
debug('Agenda._unlockJobs()');
const jobIds = this._lockedJobs.map((job) => job.attrs._id);
private async _unlockJobs(): Promise<void> {
debug('Agenda._unlockJobs()');
const jobIds = this._lockedJobs.map((job) => job.attrs._id);
if (jobIds.length === 0) {
debug('no jobs to unlock');
return resolve();
}
debug('about to unlock jobs with ids: %O', jobIds);
this._collection.updateMany({ _id: { $in: jobIds } }, { $set: { lockedAt: null } }, (err) => {
if (err) {
return reject(err);
}
if (jobIds.length === 0) {
debug('no jobs to unlock');
return;
}
this._lockedJobs = [];
return resolve();
});
});
debug('about to unlock jobs with ids: %O', jobIds);
await this.getCollection().updateMany({ _id: { $in: jobIds } }, { $set: { lockedAt: null } });
this._lockedJobs = [];
}
public stop(): Promise<void> {
@ -602,16 +617,15 @@ export class Agenda extends EventEmitter {
debug('_findAndLockNextJob(%s, [Function])', jobName);
// Don't try and access MongoDB if we've lost connection to it.
// @ts-ignore
const s = this._mdb.s.client || this._mdb.db.s.client;
if (s.topology.connections && s.topology.connections().length === 0 && !this._mongoUseUnifiedTopology) {
if (s.topology.autoReconnect && !s.topology.isDestroyed()) {
const client = (this.getMongoDB() || this.getMongoDB().db)?.s?.client;
if (client?.topology?.connections?.().length === 0 && !this._mongoUseUnifiedTopology) {
if (client.topology.autoReconnect && !client.topology.isDestroyed?.()) {
// Continue processing but notify that Agenda has lost the connection
debug('Missing MongoDB connection, not attempting to find and lock a job');
this.emit('error', new Error('Lost MongoDB connection'));
} else {
// No longer recoverable
debug('topology.autoReconnect: %s, topology.isDestroyed(): %s', s.topology.autoReconnect, s.topology.isDestroyed());
debug('topology.autoReconnect: %s, topology.isDestroyed(): %s', client.topology.autoReconnect, client.topology.isDestroyed?.());
throw new Error('MongoDB connection is not recoverable, application restart required');
}
} else {
@ -642,7 +656,7 @@ export class Agenda extends EventEmitter {
const JOB_PROCESS_SET_QUERY = { $set: { lockedAt: now } };
// Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed
const result = await this._collection.findOneAndUpdate(JOB_PROCESS_WHERE_QUERY, JOB_PROCESS_SET_QUERY, {
const result = await this.getCollection().findOneAndUpdate(JOB_PROCESS_WHERE_QUERY, JOB_PROCESS_SET_QUERY, {
returnDocument: 'after',
sort: this._sort,
});
@ -734,7 +748,7 @@ export class Agenda extends EventEmitter {
const update = { $set: { lockedAt: now } };
// Lock the job in MongoDB!
const resp = await this._collection.findOneAndUpdate(criteria, update, { returnDocument: 'after' });
const resp = await this.getCollection().findOneAndUpdate(criteria, update, { returnDocument: 'after' });
if (resp.value) {
const job = createJob(this, resp.value as unknown as IJob);

Loading…
Cancel
Save