@ -1,3 +1,5 @@
/* globals MongoInternals */
const baseName = 'rocketchat_' ;
import { EventEmitter } from 'events' ;
@ -27,6 +29,21 @@ class ModelsBaseDb extends EventEmitter {
this . wrapModel ( ) ;
this . isOplogAvailable = MongoInternals . defaultRemoteCollectionDriver ( ) . mongo . _oplogHandle && ! ! MongoInternals . defaultRemoteCollectionDriver ( ) . mongo . _oplogHandle . onOplogEntry ;
// When someone start listening for changes we start oplog if available
this . once ( 'newListener' , ( event /*, listener*/ ) => {
if ( event === 'change' ) {
if ( this . isOplogAvailable ) {
const query = {
collection : this . collectionName
} ;
MongoInternals . defaultRemoteCollectionDriver ( ) . mongo . _oplogHandle . onOplogEntry ( query , this . processOplogRecord . bind ( this ) ) ;
}
}
} ) ;
this . tryEnsureIndex ( { '_updatedAt' : 1 } ) ;
}
@ -112,15 +129,74 @@ class ModelsBaseDb extends EventEmitter {
return 'cache' ;
}
processOplogRecord ( action ) {
if ( action . op . op === 'i' ) {
this . emit ( 'change' , {
action : 'insert' ,
id : action . op . o . _id ,
data : action . op . o ,
oplog : true
} ) ;
return ;
}
if ( action . op . op === 'u' ) {
if ( ! action . op . o . $set && ! action . op . o . $unset ) {
this . emit ( 'change' , {
action : 'update:record' ,
id : action . id ,
data : action . op . o ,
oplog : true
} ) ;
return ;
}
let diff = { } ;
if ( action . op . o . $set ) {
for ( let key in action . op . o . $set ) {
if ( action . op . o . $set . hasOwnProperty ( key ) ) {
diff [ key ] = action . op . o . $set [ key ] ;
}
}
}
if ( action . op . o . $unset ) {
for ( let key in action . op . o . $unset ) {
if ( action . op . o . $unset . hasOwnProperty ( key ) ) {
diff [ key ] = undefined ;
}
}
}
this . emit ( 'change' , {
action : 'update:diff' ,
id : action . id ,
data : diff ,
oplog : true
} ) ;
return ;
}
if ( action . op . op === 'd' ) {
this . emit ( 'change' , {
action : 'remove' ,
id : action . id ,
oplog : true
} ) ;
return ;
}
}
insert ( record ) {
this . setUpdatedAt ( record ) ;
const result = this . originals . insert ( ... arguments ) ;
if ( this . listenerCount ( 'change' ) > 0 ) {
if ( ! this . isOplogAvailable && this . listenerCount ( 'change' ) > 0 ) {
this . emit ( 'change' , {
action : 'insert' ,
id : result ,
data : _ . extend ( { } , record )
data : _ . extend ( { } , record ) ,
oplog : false
} ) ;
}
@ -134,7 +210,7 @@ class ModelsBaseDb extends EventEmitter {
let strategy = this . defineSyncStrategy ( query , update , options ) ;
let ids = [ ] ;
if ( this . listenerCount ( 'change' ) > 0 && strategy === 'db' ) {
if ( ! this . isOplogAvailable && this . listenerCount ( 'change' ) > 0 && strategy === 'db' ) {
const findOptions = { fields : { _id : 1 } } ;
let records = options . multi ? this . find ( query , findOptions ) . fetch ( ) : this . findOne ( query , findOptions ) || [ ] ;
if ( ! Array . isArray ( records ) ) {
@ -153,14 +229,15 @@ class ModelsBaseDb extends EventEmitter {
const result = this . originals . update ( query , update , options ) ;
if ( this . listenerCount ( 'change' ) > 0 ) {
if ( ! this . isOplogAvailable && this . listenerCount ( 'change' ) > 0 ) {
if ( strategy === 'db' ) {
if ( options . upsert === true ) {
if ( result . insertedId ) {
this . emit ( 'change' , {
action : 'insert' ,
id : result . insertedId ,
data : this . findOne ( { _id : result . insertedId } )
data : this . findOne ( { _id : result . insertedId } ) ,
oplog : false
} ) ;
return ;
}
@ -178,20 +255,22 @@ class ModelsBaseDb extends EventEmitter {
}
for ( const record of records ) {
this . emit ( 'change' , {
action : 'update:db ' ,
action : 'update:recor d' ,
id : record . _id ,
data : _ . extend ( { } , record )
data : _ . extend ( { } , record ) ,
oplog : false
} ) ;
}
} else {
this . emit ( 'change' , {
action : 'update:cache ' ,
action : 'update:query ' ,
id : undefined ,
data : {
query : query ,
update : update ,
options : options
}
} ,
oplog : false
} ) ;
}
}
@ -221,12 +300,15 @@ class ModelsBaseDb extends EventEmitter {
const result = this . originals . remove ( query ) ;
for ( const record of records ) {
this . emit ( 'change' , {
action : 'remove' ,
id : record . _id ,
data : _ . extend ( { } , record )
} ) ;
if ( ! this . isOplogAvailable && this . listenerCount ( 'change' ) > 0 ) {
for ( const record of records ) {
this . emit ( 'change' , {
action : 'remove' ,
id : record . _id ,
data : _ . extend ( { } , record ) ,
oplog : false
} ) ;
}
}
return result ;