[IMPROVE][APPS] New storage strategy for Apps-Engine file packages (#22657)
* Adjustments and started adding GridFS storage * Add GridFS support * Error handling * Finish GridFS storage * Update FS storage to latest Apps-Engine changes * Remove dangling files from gridfs after app update * Enable the user to choose a storage for the apps' source code (#22717) * Enable the user to choose a storage for the apps' source code * Rename app storage proxy module * Adjust operation of the app source storage proxy module * Make setting fs path for app storage its own function * Remove log statement * [NEW] Migration: apps new storage strategy (#22857) * Migrate apps source file to gridfs * Remove unused dependencies * Convert base64 zip to buffer * Prevent AppServerOrchestrator from initializing twice * Refactor migration to use existing models instead of custom code * Add description for noop on promise catch Co-authored-by: Douglas Gubert <douglas.gubert@gmail.com> * Remove unsued properties from apps documents * Adjustment based on linter feedback * Rename migration file's extension * Update apps-engine version * Adjustments based on linter feedback Co-authored-by: thassiov <tvmcarvalho@gmail.com>pull/23295/head
parent
1efdd255ca
commit
0a32226313
@ -0,0 +1,68 @@ |
||||
import { promises as fs } from 'fs'; |
||||
import { join, normalize } from 'path'; |
||||
|
||||
import { AppSourceStorage, IAppStorageItem } from '@rocket.chat/apps-engine/server/storage'; |
||||
|
||||
export class AppFileSystemSourceStorage extends AppSourceStorage { |
||||
private pathPrefix = 'fs:/'; |
||||
|
||||
private path: string; |
||||
|
||||
public setPath(path: string): void { |
||||
this.path = path; |
||||
} |
||||
|
||||
public checkPath(): void { |
||||
if (!this.path) { |
||||
throw new Error('Invalid path configured for file system App storage'); |
||||
} |
||||
} |
||||
|
||||
public async store(item: IAppStorageItem, zip: Buffer): Promise<string> { |
||||
this.checkPath(); |
||||
|
||||
const filePath = this.itemToFilename(item); |
||||
|
||||
await fs.writeFile(filePath, zip); |
||||
|
||||
return this.filenameToSourcePath(filePath); |
||||
} |
||||
|
||||
public async fetch(item: IAppStorageItem): Promise<Buffer> { |
||||
if (!item.sourcePath) { |
||||
throw new Error('Invalid source path'); |
||||
} |
||||
|
||||
return fs.readFile(this.sourcePathToFilename(item.sourcePath)); |
||||
} |
||||
|
||||
public async update(item: IAppStorageItem, zip: Buffer): Promise<string> { |
||||
this.checkPath(); |
||||
|
||||
const filePath = this.itemToFilename(item); |
||||
|
||||
await fs.writeFile(filePath, zip); |
||||
|
||||
return this.filenameToSourcePath(filePath); |
||||
} |
||||
|
||||
public async remove(item: IAppStorageItem): Promise<void> { |
||||
if (!item.sourcePath) { |
||||
return; |
||||
} |
||||
|
||||
return fs.unlink(this.sourcePathToFilename(item.sourcePath)); |
||||
} |
||||
|
||||
private itemToFilename(item: IAppStorageItem): string { |
||||
return `${ normalize(join(this.path, item.id)) }.zip`; |
||||
} |
||||
|
||||
private filenameToSourcePath(filename: string): string { |
||||
return this.pathPrefix + filename; |
||||
} |
||||
|
||||
private sourcePathToFilename(sourcePath: string): string { |
||||
return sourcePath.substring(this.pathPrefix.length); |
||||
} |
||||
} |
@ -0,0 +1,81 @@ |
||||
import { MongoInternals } from 'meteor/mongo'; |
||||
import { GridFSBucket, GridFSBucketWriteStream, ObjectId } from 'mongodb'; |
||||
import { AppSourceStorage, IAppStorageItem } from '@rocket.chat/apps-engine/server/storage'; |
||||
|
||||
import { streamToBuffer } from '../../../file-upload/server/lib/streamToBuffer'; |
||||
|
||||
export class AppGridFSSourceStorage extends AppSourceStorage { |
||||
private pathPrefix = 'GridFS:/'; |
||||
|
||||
private bucket: GridFSBucket; |
||||
|
||||
constructor() { |
||||
super(); |
||||
|
||||
const { GridFSBucket } = MongoInternals.NpmModules.mongodb.module; |
||||
const { db } = MongoInternals.defaultRemoteCollectionDriver().mongo; |
||||
|
||||
this.bucket = new GridFSBucket(db, { |
||||
bucketName: 'rocketchat_apps_packages', |
||||
chunkSizeBytes: 1024 * 255, |
||||
}); |
||||
} |
||||
|
||||
public async store(item: IAppStorageItem, zip: Buffer): Promise<string> { |
||||
return new Promise((resolve, reject) => { |
||||
const filename = this.itemToFilename(item); |
||||
const writeStream: GridFSBucketWriteStream = this.bucket.openUploadStream(filename) |
||||
.on('finish', () => resolve(this.idToPath(writeStream.id))) |
||||
.on('error', (error) => reject(error)); |
||||
|
||||
writeStream.write(zip); |
||||
writeStream.end(); |
||||
}); |
||||
} |
||||
|
||||
public async fetch(item: IAppStorageItem): Promise<Buffer> { |
||||
return streamToBuffer(this.bucket.openDownloadStream(this.itemToObjectId(item))); |
||||
} |
||||
|
||||
public async update(item: IAppStorageItem, zip: Buffer): Promise<string> { |
||||
return new Promise((resolve, reject) => { |
||||
const fileId = this.itemToFilename(item); |
||||
const writeStream: GridFSBucketWriteStream = this.bucket.openUploadStream(fileId) |
||||
.on('finish', () => { |
||||
resolve(this.idToPath(writeStream.id)); |
||||
// An error in the following line would not cause the update process to fail
|
||||
// eslint-disable-next-line @typescript-eslint/no-empty-function
|
||||
this.remove(item).catch(() => {}); |
||||
}) |
||||
|
||||
.on('error', (error) => reject(error)); |
||||
|
||||
writeStream.write(zip); |
||||
writeStream.end(); |
||||
}); |
||||
} |
||||
|
||||
public async remove(item: IAppStorageItem): Promise<void> { |
||||
return new Promise((resolve, reject) => { |
||||
this.bucket.delete(this.itemToObjectId(item), (error) => { |
||||
if (error) { |
||||
return reject(error); |
||||
} |
||||
|
||||
resolve(); |
||||
}); |
||||
}); |
||||
} |
||||
|
||||
private itemToFilename(item: IAppStorageItem): string { |
||||
return `${ item.info.nameSlug }-${ item.info.version }.package`; |
||||
} |
||||
|
||||
private idToPath(id: GridFSBucketWriteStream['id']): string { |
||||
return this.pathPrefix + id; |
||||
} |
||||
|
||||
private itemToObjectId(item: IAppStorageItem): ObjectId { |
||||
return new ObjectId(item.sourcePath?.substring(this.pathPrefix.length)); |
||||
} |
||||
} |
@ -0,0 +1,53 @@ |
||||
import { AppSourceStorage, IAppStorageItem } from '@rocket.chat/apps-engine/server/storage'; |
||||
|
||||
import { AppFileSystemSourceStorage } from './AppFileSystemSourceStorage'; |
||||
import { AppGridFSSourceStorage } from './AppGridFSSourceStorage'; |
||||
|
||||
export class ConfigurableAppSourceStorage extends AppSourceStorage { |
||||
private filesystem: AppFileSystemSourceStorage; |
||||
|
||||
private gridfs: AppGridFSSourceStorage; |
||||
|
||||
private storage: AppSourceStorage; |
||||
|
||||
constructor(readonly storageType: string, filesystemStoragePath: string) { |
||||
super(); |
||||
|
||||
this.filesystem = new AppFileSystemSourceStorage(); |
||||
this.gridfs = new AppGridFSSourceStorage(); |
||||
|
||||
this.setStorage(storageType); |
||||
this.setFileSystemStoragePath(filesystemStoragePath); |
||||
} |
||||
|
||||
public setStorage(type: string): void { |
||||
switch (type) { |
||||
case 'filesystem': |
||||
this.storage = this.filesystem; |
||||
break; |
||||
case 'gridfs': |
||||
this.storage = this.gridfs; |
||||
break; |
||||
} |
||||
} |
||||
|
||||
public setFileSystemStoragePath(path: string): void { |
||||
this.filesystem.setPath(path); |
||||
} |
||||
|
||||
public async store(item: IAppStorageItem, zip: Buffer): Promise<string> { |
||||
return this.storage.store(item, zip); |
||||
} |
||||
|
||||
public async fetch(item: IAppStorageItem): Promise<Buffer> { |
||||
return this.storage.fetch(item); |
||||
} |
||||
|
||||
public async update(item: IAppStorageItem, zip: Buffer): Promise<string> { |
||||
return this.storage.update(item, zip); |
||||
} |
||||
|
||||
public async remove(item: IAppStorageItem): Promise<void> { |
||||
return this.storage.remove(item); |
||||
} |
||||
} |
@ -1,4 +1,5 @@ |
||||
import { AppRealLogsStorage } from './logs-storage'; |
||||
import { AppRealStorage } from './storage'; |
||||
|
||||
export { AppRealLogsStorage, AppRealStorage }; |
||||
export { AppRealLogsStorage } from './logs-storage'; |
||||
export { AppRealStorage } from './AppRealStorage'; |
||||
export { AppFileSystemSourceStorage } from './AppFileSystemSourceStorage'; |
||||
export { AppGridFSSourceStorage } from './AppGridFSSourceStorage'; |
||||
export { ConfigurableAppSourceStorage } from './ConfigurableAppSourceStorage'; |
||||
|
@ -1,11 +1,12 @@ |
||||
import { Readable } from 'stream'; |
||||
|
||||
export const streamToBuffer = (stream: Readable): Promise<Buffer> => new Promise((resolve) => { |
||||
export const streamToBuffer = (stream: Readable): Promise<Buffer> => new Promise((resolve, reject) => { |
||||
const chunks: Array<Buffer> = []; |
||||
|
||||
stream |
||||
.on('data', (data) => chunks.push(data)) |
||||
.on('end', () => resolve(Buffer.concat(chunks))) |
||||
.on('error', (error) => reject(error)) |
||||
// force stream to resume data flow in case it was explicitly paused before
|
||||
.resume(); |
||||
}); |
||||
|
@ -0,0 +1,19 @@ |
||||
import { AppManager } from '@rocket.chat/apps-engine/server/AppManager'; |
||||
|
||||
import { addMigration } from '../../lib/migrations'; |
||||
import { Apps } from '../../../app/apps/server'; |
||||
|
||||
addMigration({ |
||||
version: 238, |
||||
up() { |
||||
Apps.initialize(); |
||||
|
||||
const apps = Apps._model.find().fetch(); |
||||
|
||||
for (const app of apps) { |
||||
const zipFile = Buffer.from(app.zip, 'base64'); |
||||
Promise.await((Apps._manager as AppManager).update(zipFile, app.permissionsGranted, { loadApp: false })); |
||||
Promise.await(Apps._model.update({ id: app.id }, { $unset: { zip: 1, compiled: 1 } })); |
||||
} |
||||
}, |
||||
}); |
Loading…
Reference in new issue