import type { RestClientInterface } from '@rocket.chat/api-client'; import type { SDK, ClientStream, StreamKeys, StreamNames, StreamerCallbackArgs, ServerMethods } from '@rocket.chat/ddp-client'; import { Emitter } from '@rocket.chat/emitter'; import { Accounts } from 'meteor/accounts-base'; import { DDPCommon } from 'meteor/ddp-common'; import { Meteor } from 'meteor/meteor'; import { APIClient } from './RestApiClient'; declare module '@rocket.chat/ddp-client' { // eslint-disable-next-line @typescript-eslint/naming-convention interface SDK { stream>( streamName: N, args: [key: K, ...args: unknown[]], callback: (...args: StreamerCallbackArgs) => void, ): ReturnType; call(method: T, ...args: Parameters): Promise>; } } const isChangedCollectionPayload = ( msg: any, ): msg is { msg: 'changed'; collection: string; fields: { eventName: string; args: unknown[] } } => { if (typeof msg !== 'object' && (msg !== null || msg !== undefined)) { return false; } if (msg.msg !== 'changed') { return false; } if (typeof msg.collection !== 'string') { return false; } if (typeof msg.fields !== 'object' && (msg.fields !== null || msg.fields !== undefined)) { return false; } if (typeof msg.fields.eventName !== 'string') { return false; } if (!Array.isArray(msg.fields.args)) { return false; } return true; }; type EventMap = StreamKeys> = { [key in `stream-${N}/${K}`]: StreamerCallbackArgs; }; type StreamMapValue = { stop: () => void; onError: (cb: (...args: any[]) => void) => () => void; onChange: ReturnType['onChange']; onStop: (cb: () => void) => () => void; ready: () => Promise; isReady: boolean; unsubList: Set<() => void>; }; const createNewMeteorStream = (streamName: StreamNames, key: StreamKeys, args: unknown[]): StreamMapValue => { const ee = new Emitter<{ ready: [error: any] | [undefined, any]; error: [error: any]; stop: undefined; }>(); const meta = { ready: false, }; const sub = Meteor.connection.subscribe( `stream-${streamName}`, key, { useCollection: false, args }, { onReady: (args: any) => { meta.ready = true; ee.emit('ready', [undefined, args]); }, onError: (err: any) => { ee.emit('ready', [err]); ee.emit('error', err); }, onStop: () => { ee.emit('stop'); }, }, ); const onChange: ReturnType['onChange'] = (cb) => { if (meta.ready) { cb({ msg: 'ready', subs: [], }); return; } ee.once('ready', ([error, result]) => { if (error) { cb({ msg: 'nosub', id: '', error, }); return; } cb(result); }); }; const ready = () => { if (meta.ready) { return Promise.resolve(); } return new Promise((resolve, reject) => { ee.once('ready', ([err]) => { if (err) { reject(err); return; } resolve(); }); }); }; return { stop: sub.stop, onChange, ready, onError: (cb: (...args: any[]) => void) => ee.once('error', (error) => { cb(error); }), onStop: (cb: () => void) => ee.once('stop', cb), get isReady() { return meta.ready; }, unsubList: new Set(), }; }; const createStreamManager = () => { // Emitter that replicates stream messages to registered callbacks const streamProxy = new Emitter(); // Collection of unsubscribe callbacks for each stream. // const proxyUnsubLists = new Map void>>(); const streams = new Map(); Accounts.onLogout(() => { streams.forEach((stream) => { stream.unsubList.forEach((stop) => stop()); }); }); Meteor.connection._stream.on('message', (rawMsg: string) => { const msg = DDPCommon.parseDDP(rawMsg); if (!isChangedCollectionPayload(msg)) { return; } streamProxy.emit(`${msg.collection}/${msg.fields.eventName}` as any, msg.fields.args as any); }); const stream: SDK['stream'] = >( name: N, data: [key: K, ...args: unknown[]], callback: (...args: StreamerCallbackArgs) => void, _options?: { retransmit?: boolean | undefined; retransmitToSelf?: boolean | undefined; }, ): ReturnType => { const [key, ...args] = data; const eventLiteral = `stream-${name}/${key}` as const; const proxyCallback = (args?: unknown): void => { if (!args || !Array.isArray(args)) { throw new Error('Invalid streamer callback'); } callback(...(args as StreamerCallbackArgs)); }; streamProxy.on(eventLiteral, proxyCallback); const stream = streams.get(eventLiteral) || createNewMeteorStream(name, key, args); const stop = (): void => { streamProxy.off(eventLiteral, proxyCallback); // If someone is still listening, don't unsubscribe if (streamProxy.has(eventLiteral)) { return; } if (stream) { stream.stop(); streams.delete(eventLiteral); } }; stream.unsubList.add(stop); if (!streams.has(eventLiteral)) { const offError = stream.onError(() => { stream.unsubList.forEach((stop) => stop()); }); const offStop = stream.onStop(() => { stream.unsubList.forEach((stop) => stop()); }); stream.unsubList.add(offError); stream.unsubList.add(offStop); streams.set(eventLiteral, stream); } return { id: '', name, params: data as any, stop, ready: stream.ready, onChange: stream.onChange, isReady: stream.isReady, }; }; const stopAll = (streamName: string, key: string) => { const stream = streams.get(`stream-${streamName}/${key}`); if (stream) { stream.unsubList.forEach((stop) => stop()); } }; return { stream, stopAll }; }; export const createSDK = (rest: RestClientInterface) => { const { stream, stopAll } = createStreamManager(); const publish = (name: string, args: unknown[]) => { Meteor.call(`stream-${name}`, ...args); }; const call = (method: T, ...args: Parameters): Promise> => { return Meteor.callAsync(method, ...args); }; const disconnect = () => { Meteor.disconnect(); }; const reconnect = () => { Meteor.reconnect(); }; return { rest, stop: stopAll, stream, publish, call, disconnect, reconnect, }; }; export const sdk = createSDK(APIClient);