Split oplog emitters in files (#14917)

pull/14765/merge
Guilherme Gazzo 6 years ago committed by GitHub
parent 3ce57f48c1
commit da8775aab1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      app/authorization/server/publications/permissions/emitter.js
  2. 19
      app/authorization/server/publications/permissions/index.js
  3. 1
      app/lib/server/index.js
  4. 99
      app/lib/server/publications/settings.js
  5. 40
      app/models/server/models/_BaseDb.js
  6. 1
      server/main.js
  7. 37
      server/publications/room/emitter.js
  8. 40
      server/publications/room/index.js
  9. 45
      server/publications/settings/emitter.js
  10. 58
      server/publications/settings/index.js
  11. 27
      server/publications/subscription/emitter.js
  12. 24
      server/publications/subscription/index.js
  13. 82
      server/stream/messages.js
  14. 39
      server/stream/messages/emitter.js
  15. 51
      server/stream/messages/index.js

@ -0,0 +1,24 @@
import { Notifications } from '../../../../notifications';
import Permissions from '../../../../models/server/models/Permissions';
Permissions.on('change', ({ clientAction, id, data, diff }) => {
if (diff && Object.keys(diff).length === 1 && diff._updatedAt) { // avoid useless changes
return;
}
switch (clientAction) {
case 'updated':
case 'inserted':
data = data || Permissions.findOneById(id);
break;
case 'removed':
data = { _id: id };
break;
}
Notifications.notifyLoggedInThisInstance(
'permissions-changed',
clientAction,
data
);
});

@ -1,7 +1,7 @@
import { Meteor } from 'meteor/meteor';
import Permissions from '../../../models/server/models/Permissions';
import { Notifications } from '../../../notifications';
import Permissions from '../../../../models/server/models/Permissions';
import './emitter';
Meteor.methods({
'permissions/get'(updatedAt) {
@ -20,18 +20,3 @@ Meteor.methods({
return records;
},
});
Permissions.on('change', ({ clientAction, id, data }) => {
switch (clientAction) {
case 'updated':
case 'inserted':
data = data || Permissions.findOneById(id);
break;
case 'removed':
data = { _id: id };
break;
}
Notifications.notifyLoggedInThisInstance('permissions-changed', clientAction, data);
});

@ -65,7 +65,6 @@ import './methods/setUsername';
import './methods/unarchiveRoom';
import './methods/unblockUser';
import './methods/updateMessage';
import './publications/settings';
export * from './lib';
export * from './functions';

@ -1,99 +0,0 @@
import { Meteor } from 'meteor/meteor';
import { Settings } from '../../../models';
import { hasPermission } from '../../../authorization';
import { Notifications } from '../../../notifications';
Meteor.methods({
'public-settings/get'(updatedAt) {
const records = Settings.findNotHiddenPublic().fetch();
if (updatedAt instanceof Date) {
return {
update: records.filter(function(record) {
return record._updatedAt > updatedAt;
}),
remove: Settings.trashFindDeletedAfter(updatedAt, {
hidden: {
$ne: true,
},
public: true,
}, {
fields: {
_id: 1,
_deletedAt: 1,
},
}).fetch(),
};
}
return records;
},
'private-settings/get'(updatedAfter) {
if (!Meteor.userId()) {
return [];
}
if (!hasPermission(Meteor.userId(), 'view-privileged-setting')) {
return [];
}
if (!(updatedAfter instanceof Date)) {
return Settings.findNotHidden().fetch();
}
const records = Settings.findNotHidden({ updatedAfter }).fetch();
return {
update: records,
remove: Settings.trashFindDeletedAfter(updatedAfter, {
hidden: {
$ne: true,
},
}, {
fields: {
_id: 1,
_deletedAt: 1,
},
}).fetch(),
};
},
});
Settings.on('change', ({ clientAction, id, data, diff }) => {
if (diff && Object.keys(diff).length === 1 && diff._updatedAt) { // avoid useless changes
return;
}
switch (clientAction) {
case 'updated':
case 'inserted': {
const setting = data || Settings.findOneById(id);
const value = {
_id: setting._id,
value: setting.value,
editor: setting.editor,
properties: setting.properties,
};
if (setting.public === true) {
Notifications.notifyAllInThisInstance('public-settings-changed', clientAction, value);
}
Notifications.notifyLoggedInThisInstance('private-settings-changed', clientAction, setting);
break;
}
case 'removed': {
const setting = data || Settings.findOneById(id, { fields: { public: 1 } });
if (setting && setting.public === true) {
Notifications.notifyAllInThisInstance('public-settings-changed', clientAction, { _id: id });
}
Notifications.notifyLoggedInThisInstance('private-settings-changed', clientAction, { _id: id });
break;
}
}
});
Notifications.streamAll.allowRead('private-settings-changed', function() {
if (this.userId == null) {
return false;
}
return hasPermission(this.userId, 'view-privileged-setting');
});

@ -14,8 +14,6 @@ try {
console.log(e);
}
const isOplogEnabled = MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle && !!MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle.onOplogEntry;
export class BaseDb extends EventEmitter {
constructor(model, baseModel) {
super();
@ -34,24 +32,30 @@ export class BaseDb extends EventEmitter {
this.wrapModel();
let alreadyListeningToOplog = false;
// When someone start listening for changes we start oplog if available
this.on('newListener', (event/* , listener*/) => {
if (event === 'change' && alreadyListeningToOplog === false) {
alreadyListeningToOplog = true;
if (isOplogEnabled) {
const query = {
collection: this.collectionName,
};
MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle.onOplogEntry(query, this.processOplogRecord.bind(this));
// Meteor will handle if we have a value https://github.com/meteor/meteor/blob/5dcd0b2eb9c8bf881ffbee98bc4cb7631772c4da/packages/mongo/oplog_tailing.js#L5
if (process.env.METEOR_OPLOG_TOO_FAR_BEHIND == null) {
MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle._defineTooFarBehind(Number.MAX_SAFE_INTEGER);
}
}
const handleListener = (event /* , listener*/) => {
if (event !== 'change') {
return;
}
});
this.removeListener('newListener', handleListener);
const query = {
collection: this.collectionName,
};
MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle.onOplogEntry(
query,
this.processOplogRecord.bind(this)
);
// Meteor will handle if we have a value https://github.com/meteor/meteor/blob/5dcd0b2eb9c8bf881ffbee98bc4cb7631772c4da/packages/mongo/oplog_tailing.js#L5
if (process.env.METEOR_OPLOG_TOO_FAR_BEHIND == null) {
MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle._defineTooFarBehind(
Number.MAX_SAFE_INTEGER
);
}
};
this.on('newListener', handleListener);
this.tryEnsureIndex({ _updatedAt: 1 });
}

@ -72,6 +72,7 @@ import './publications/room';
import './publications/roomFiles';
import './publications/roomFilesWithSearchText';
import './publications/roomSubscriptionsByRole';
import './publications/settings';
import './publications/spotlight';
import './publications/subscription';
import './publications/userAutocomplete';

@ -0,0 +1,37 @@
import { Rooms, Subscriptions } from '../../../app/models';
import { Notifications } from '../../../app/notifications';
import { fields } from '.';
const getSubscriptions = (id) => {
const fields = { 'u._id': 1 };
return Subscriptions.trashFind({ rid: id }, { fields });
};
Rooms.on('change', ({ clientAction, id, data }) => {
switch (clientAction) {
case 'updated':
case 'inserted':
// Override data cuz we do not publish all fields
data = Rooms.findOneById(id, { fields });
break;
case 'removed':
data = { _id: id };
break;
}
if (data) {
if (clientAction === 'removed') {
getSubscriptions(clientAction, id).forEach(({ u }) => {
Notifications.notifyUserInThisInstance(
u._id,
'rooms-changed',
clientAction,
data
);
});
}
Notifications.streamUser.__emit(id, clientAction, data);
}
});

@ -1,13 +1,13 @@
import { Meteor } from 'meteor/meteor';
import _ from 'underscore';
import { roomTypes } from '../../app/utils';
import { hasPermission } from '../../app/authorization';
import { Rooms, Subscriptions } from '../../app/models';
import { settings } from '../../app/settings';
import { Notifications } from '../../app/notifications';
import { roomTypes } from '../../../app/utils';
import { hasPermission } from '../../../app/authorization';
import { Rooms } from '../../../app/models';
import { settings } from '../../../app/settings';
import './emitter';
const fields = {
export const fields = {
_id: 1,
name: 1,
fname: 1,
@ -107,31 +107,3 @@ Meteor.methods({
return roomMap(room);
},
});
const getSubscriptions = (id) => {
const fields = { 'u._id': 1 };
return Subscriptions.trashFind({ rid: id }, { fields });
};
Rooms.on('change', ({ clientAction, id, data }) => {
switch (clientAction) {
case 'updated':
case 'inserted':
// Override data cuz we do not publish all fields
data = Rooms.findOneById(id, { fields });
break;
case 'removed':
data = { _id: id };
break;
}
if (data) {
if (clientAction === 'removed') {
getSubscriptions(clientAction, id).forEach(({ u }) => {
Notifications.notifyUserInThisInstance(u._id, 'rooms-changed', clientAction, data);
});
}
Notifications.streamUser.__emit(id, clientAction, data);
}
});

@ -0,0 +1,45 @@
import { Settings } from '../../../app/models';
import { Notifications } from '../../../app/notifications';
import { hasPermission } from '../../../app/authorization';
Settings.on('change', ({ clientAction, id, data, diff }) => {
if (diff && Object.keys(diff).length === 1 && diff._updatedAt) { // avoid useless changes
return;
}
switch (clientAction) {
case 'updated':
case 'inserted': {
const setting = data || Settings.findOneById(id);
const value = {
_id: setting._id,
value: setting.value,
editor: setting.editor,
properties: setting.properties,
};
if (setting.public === true) {
Notifications.notifyAllInThisInstance('public-settings-changed', clientAction, value);
}
Notifications.notifyLoggedInThisInstance('private-settings-changed', clientAction, setting);
break;
}
case 'removed': {
const setting = data || Settings.findOneById(id, { fields: { public: 1 } });
if (setting && setting.public === true) {
Notifications.notifyAllInThisInstance('public-settings-changed', clientAction, { _id: id });
}
Notifications.notifyLoggedInThisInstance('private-settings-changed', clientAction, { _id: id });
break;
}
}
});
Notifications.streamAll.allowRead('private-settings-changed', function() {
if (this.userId == null) {
return false;
}
return hasPermission(this.userId, 'view-privileged-setting');
});

@ -0,0 +1,58 @@
import { Meteor } from 'meteor/meteor';
import { Settings } from '../../../app/models';
import { hasPermission } from '../../../app/authorization';
import './emitter';
Meteor.methods({
'public-settings/get'(updatedAt) {
const records = Settings.findNotHiddenPublic().fetch();
if (updatedAt instanceof Date) {
return {
update: records.filter(function(record) {
return record._updatedAt > updatedAt;
}),
remove: Settings.trashFindDeletedAfter(updatedAt, {
hidden: {
$ne: true,
},
public: true,
}, {
fields: {
_id: 1,
_deletedAt: 1,
},
}).fetch(),
};
}
return records;
},
'private-settings/get'(updatedAfter) {
if (!Meteor.userId()) {
return [];
}
if (!hasPermission(Meteor.userId(), 'view-privileged-setting')) {
return [];
}
if (!(updatedAfter instanceof Date)) {
return Settings.findNotHidden().fetch();
}
const records = Settings.findNotHidden({ updatedAfter }).fetch();
return {
update: records,
remove: Settings.trashFindDeletedAfter(updatedAfter, {
hidden: {
$ne: true,
},
}, {
fields: {
_id: 1,
_deletedAt: 1,
},
}).fetch(),
};
},
});

@ -0,0 +1,27 @@
import { Notifications } from '../../../app/notifications';
import { Subscriptions } from '../../../app/models';
import { fields } from '.';
Subscriptions.on('change', ({ clientAction, id, data }) => {
switch (clientAction) {
case 'inserted':
case 'updated':
// Override data cuz we do not publish all fields
data = Subscriptions.findOneById(id, { fields });
break;
case 'removed':
data = Subscriptions.trashFindOneById(id, { fields: { u: 1, rid: 1 } });
break;
}
Notifications.streamUser.__emit(data.u._id, clientAction, data);
Notifications.notifyUserInThisInstance(
data.u._id,
'subscriptions-changed',
clientAction,
data
);
});

@ -1,9 +1,9 @@
import { Meteor } from 'meteor/meteor';
import { Subscriptions } from '../../app/models';
import { Notifications } from '../../app/notifications';
import { Subscriptions } from '../../../app/models';
import './emitter';
const fields = {
export const fields = {
t: 1,
ts: 1,
ls: 1,
@ -70,21 +70,3 @@ Meteor.methods({
return records;
},
});
Subscriptions.on('change', ({ clientAction, id, data }) => {
switch (clientAction) {
case 'inserted':
case 'updated':
// Override data cuz we do not publish all fields
data = Subscriptions.findOneById(id, { fields });
break;
case 'removed':
data = Subscriptions.trashFindOneById(id, { fields: { u: 1, rid: 1 } });
break;
}
Notifications.streamUser.__emit(data.u._id, clientAction, data);
Notifications.notifyUserInThisInstance(data.u._id, 'subscriptions-changed', clientAction, data);
});

@ -1,82 +0,0 @@
import { Meteor } from 'meteor/meteor';
import { hasPermission } from '../../app/authorization';
import { settings } from '../../app/settings';
import { Subscriptions, Users, Messages } from '../../app/models';
import { msgStream } from '../../app/lib';
const MY_MESSAGE = '__my_messages__';
msgStream.allowWrite('none');
msgStream.allowRead(function(eventName, args) {
try {
const room = Meteor.call('canAccessRoom', eventName, this.userId, args);
if (!room) {
return false;
}
if (room.t === 'c' && !hasPermission(this.userId, 'preview-c-room') && !Subscriptions.findOneByRoomIdAndUserId(room._id, this.userId, { fields: { _id: 1 } })) {
return false;
}
return true;
} catch (error) {
/* error*/
return false;
}
});
msgStream.allowRead(MY_MESSAGE, 'all');
msgStream.allowEmit(MY_MESSAGE, function(eventName, msg) {
try {
const room = Meteor.call('canAccessRoom', msg.rid, this.userId);
if (!room) {
return false;
}
return {
roomParticipant: Subscriptions.findOneByRoomIdAndUserId(room._id, this.userId, { fields: { _id: 1 } }) != null,
roomType: room.t,
roomName: room.name,
};
} catch (error) {
/* error*/
return false;
}
});
Meteor.startup(function() {
function publishMessage(type, record) {
if (record._hidden !== true && (record.imported == null)) {
const UI_Use_Real_Name = settings.get('UI_Use_Real_Name') === true;
if (record.u && record.u._id && UI_Use_Real_Name) {
const user = Users.findOneById(record.u._id);
record.u.name = user && user.name;
}
if (record.mentions && record.mentions.length && UI_Use_Real_Name) {
record.mentions.forEach((mention) => {
const user = Users.findOneById(mention._id);
mention.name = user && user.name;
});
}
msgStream.mymessage(MY_MESSAGE, record);
msgStream.emitWithoutBroadcast(record.rid, record);
}
}
return Messages.on('change', function({ clientAction, id, data/* , oplog*/ }) {
switch (clientAction) {
case 'inserted':
case 'updated':
const message = data || Messages.findOne({ _id: id });
publishMessage(clientAction, message);
break;
}
});
});

@ -0,0 +1,39 @@
import { Meteor } from 'meteor/meteor';
import { settings } from '../../../app/settings';
import { Users, Messages } from '../../../app/models';
import { msgStream } from '../../../app/lib/server';
import { MY_MESSAGE } from '.';
Meteor.startup(function() {
function publishMessage(type, record) {
if (record._hidden !== true && (record.imported == null)) {
const UI_Use_Real_Name = settings.get('UI_Use_Real_Name') === true;
if (record.u && record.u._id && UI_Use_Real_Name) {
const user = Users.findOneById(record.u._id);
record.u.name = user && user.name;
}
if (record.mentions && record.mentions.length && UI_Use_Real_Name) {
record.mentions.forEach((mention) => {
const user = Users.findOneById(mention._id);
mention.name = user && user.name;
});
}
msgStream.mymessage(MY_MESSAGE, record);
msgStream.emitWithoutBroadcast(record.rid, record);
}
}
return Messages.on('change', function({ clientAction, id, data/* , oplog*/ }) {
switch (clientAction) {
case 'inserted':
case 'updated':
const message = data || Messages.findOne({ _id: id });
publishMessage(clientAction, message);
break;
}
});
});

@ -0,0 +1,51 @@
import { Meteor } from 'meteor/meteor';
import { hasPermission } from '../../../app/authorization';
import { Subscriptions } from '../../../app/models';
import { msgStream } from '../../../app/lib/server';
import './emitter';
export const MY_MESSAGE = '__my_messages__';
msgStream.allowWrite('none');
msgStream.allowRead(function(eventName, args) {
try {
const room = Meteor.call('canAccessRoom', eventName, this.userId, args);
if (!room) {
return false;
}
if (room.t === 'c' && !hasPermission(this.userId, 'preview-c-room') && !Subscriptions.findOneByRoomIdAndUserId(room._id, this.userId, { fields: { _id: 1 } })) {
return false;
}
return true;
} catch (error) {
/* error*/
return false;
}
});
msgStream.allowRead(MY_MESSAGE, 'all');
msgStream.allowEmit(MY_MESSAGE, function(eventName, msg) {
try {
const room = Meteor.call('canAccessRoom', msg.rid, this.userId);
if (!room) {
return false;
}
return {
roomParticipant: Subscriptions.findOneByRoomIdAndUserId(room._id, this.userId, { fields: { _id: 1 } }) != null,
roomType: room.t,
roomName: room.name,
};
} catch (error) {
/* error*/
return false;
}
});
Loading…
Cancel
Save