[Regression] Replace the Omnichannel queue model observe with Stream (#16999)

* Replace LivechatInquiry observe by stream.

* Unify stream.

* Add hasPermission method.

* Add missing importers.

* Revert package-lock file.

* Improve the codebase.

* Add return statement.

* Fix remove listeners that were missing.

* Removed unnecessary imports.

* Remove unnecessary function parameters.
pull/16973/head^2
Renato Becker 5 years ago committed by GitHub
parent 80c69c0fb8
commit 54b5523618
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      app/livechat/client/lib/stream/inquiry.js
  2. 27
      app/livechat/client/lib/stream/queueManager.js
  3. 9
      app/livechat/client/views/app/livechatReadOnly.js
  4. 1
      app/livechat/lib/stream/constants.js
  5. 1
      app/livechat/server/index.js
  6. 34
      app/livechat/server/lib/stream/inquiry.js
  7. 64
      app/livechat/server/lib/stream/queueManager.js

@ -1,5 +1,5 @@
import { Meteor } from 'meteor/meteor';
import { LIVECHAT_INQUIRY_DATA_STREAM_OBSERVER } from '../../../lib/stream/constants';
import { LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER } from '../../../lib/stream/constants';
export const inquiryDataStream = new Meteor.Streamer(LIVECHAT_INQUIRY_DATA_STREAM_OBSERVER);
export const inquiryDataStream = new Meteor.Streamer(LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER);

@ -1,31 +1,29 @@
import { Meteor } from 'meteor/meteor';
import { APIClient } from '../../../../utils/client';
import { getLivechatInquiryCollection } from '../../collections/LivechatInquiry';
import { LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER } from '../../../lib/stream/constants';
import { inquiryDataStream } from './inquiry';
import { hasRole } from '../../../../authorization/client';
const livechatQueueStreamer = new Meteor.Streamer('livechat-queue-stream');
let agentDepartments = [];
const collection = getLivechatInquiryCollection();
const events = {
added: (inquiry, collection) => {
added: (inquiry) => {
delete inquiry.type;
collection.insert(inquiry);
},
changed: (inquiry, collection) => {
changed: (inquiry) => {
if (inquiry.status !== 'queued' || (inquiry.department && !agentDepartments.includes(inquiry.department))) {
return collection.remove(inquiry._id);
}
delete inquiry.type;
collection.upsert({ _id: inquiry._id }, inquiry);
},
removed: (inquiry, collection) => collection.remove(inquiry._id),
removed: (inquiry) => collection.remove(inquiry._id),
};
const appendListenerToDepartment = (departmentId, collection) => livechatQueueStreamer.on(`${ LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER }/${ departmentId }`, (inquiry) => events[inquiry.type](inquiry, collection));
const removeListenerOfDepartment = (departmentId) => livechatQueueStreamer.removeListener(`${ LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER }/${ departmentId }`);
const updateCollection = (inquiry) => { events[inquiry.type](inquiry); };
const appendListenerToDepartment = (departmentId) => inquiryDataStream.on(`department/${ departmentId }`, updateCollection);
const removeListenerOfDepartment = (departmentId) => inquiryDataStream.removeListener(`department/${ departmentId }`, updateCollection);
const getInquiriesFromAPI = async (url) => {
const { inquiries } = await APIClient.v1.get(url);
@ -33,7 +31,6 @@ const getInquiriesFromAPI = async (url) => {
};
const updateInquiries = async (inquiries) => {
const collection = getLivechatInquiryCollection();
(inquiries || []).forEach((inquiry) => collection.upsert({ _id: inquiry._id }, inquiry));
};
@ -43,9 +40,8 @@ const getAgentsDepartments = async (userId) => {
};
const addListenerForeachDepartment = async (userId, departments) => {
const collection = getLivechatInquiryCollection();
if (departments && Array.isArray(departments) && departments.length) {
departments.forEach((department) => appendListenerToDepartment(department, collection));
departments.forEach((department) => appendListenerToDepartment(department));
}
};
@ -54,11 +50,10 @@ const removeDepartmentsListeners = (departments) => {
};
const removeGlobalListener = () => {
livechatQueueStreamer.removeListener(LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER);
inquiryDataStream.removeListener('public', updateCollection);
};
export const initializeLivechatInquiryStream = async (userId) => {
const collection = getLivechatInquiryCollection();
collection.remove({});
if (agentDepartments.length) {
removeDepartmentsListeners(agentDepartments);
@ -68,6 +63,6 @@ export const initializeLivechatInquiryStream = async (userId) => {
agentDepartments = (await getAgentsDepartments(userId)).map((department) => department.departmentId);
await addListenerForeachDepartment(userId, agentDepartments);
if (agentDepartments.length === 0 || hasRole(userId, 'livechat-manager')) {
livechatQueueStreamer.on(LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER, (inquiry) => events[inquiry.type](inquiry, collection));
inquiryDataStream.on('public', updateCollection);
}
};

@ -49,11 +49,12 @@ Template.livechatReadOnly.onCreated(function() {
this.routingConfig = new ReactiveVar({});
this.preparing = new ReactiveVar(true);
this.updateInquiry = async (inquiry) => {
this.inquiry.set(inquiry);
if (!await call('canAccessRoom', inquiry.rid, Meteor.userId())) {
FlowRouter.go('/home');
this.updateInquiry = async ({ clientAction, ...inquiry }) => {
if (clientAction === 'removed' || !await call('canAccessRoom', inquiry.rid, Meteor.userId())) {
return FlowRouter.go('/home');
}
this.inquiry.set(inquiry);
};
Meteor.call('livechat:getRoutingConfig', (err, config) => {

@ -1,2 +1 @@
export const LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER = 'livechat-inquiry-queue-observer';
export const LIVECHAT_INQUIRY_DATA_STREAM_OBSERVER = 'livechat-inquiry-data-observer';

@ -76,7 +76,6 @@ import './lib/routing/External';
import './lib/routing/ManualSelection';
import './lib/routing/AutoSelection';
import './lib/stream/departmentAgents';
import './lib/stream/inquiry';
import './lib/stream/queueManager';
import './sendMessageBySMS';
import './unclosedLivechats';

@ -1,34 +0,0 @@
import { Meteor } from 'meteor/meteor';
import { LivechatInquiry } from '../../../../models/server';
import { LIVECHAT_INQUIRY_DATA_STREAM_OBSERVER } from '../../../lib/stream/constants';
import { hasPermission } from '../../../../authorization/server';
export const inquiryDataStream = new Meteor.Streamer(LIVECHAT_INQUIRY_DATA_STREAM_OBSERVER);
inquiryDataStream.allowWrite('none');
inquiryDataStream.allowRead(function() {
return this.userId ? hasPermission(this.userId, 'view-l-room') : false;
});
const emitInquiryDataEvent = (id, data) => {
if (!data) {
return;
}
inquiryDataStream.emit(id, data);
};
LivechatInquiry.on('change', ({ clientAction, id }) => {
switch (clientAction) {
case 'inserted':
case 'updated':
emitInquiryDataEvent(id, LivechatInquiry.findOneById(id));
break;
case 'removed':
emitInquiryDataEvent(id, { _id: id });
break;
}
});

@ -4,38 +4,44 @@ import { hasPermission } from '../../../../authorization/server';
import { LivechatInquiry } from '../../../../models/server';
import { LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER } from '../../../lib/stream/constants';
const livechatQueueStreamer = new Meteor.Streamer('livechat-queue-stream');
livechatQueueStreamer.allowWrite('none');
livechatQueueStreamer.allowRead(function() {
const queueDataStreamer = new Meteor.Streamer(LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER);
queueDataStreamer.allowWrite('none');
queueDataStreamer.allowRead(function() {
return this.userId ? hasPermission(this.userId, 'view-l-room') : false;
});
const emitEvent = (event, data) => livechatQueueStreamer.emit(event, data);
const emitQueueDataEvent = (event, data) => queueDataStreamer.emit(event, data);
const mountDataToEmit = (type, data) => ({ type, ...data });
LivechatInquiry.find({}).observeChanges({
added(_id, record) {
if (record && record.department) {
return emitEvent(`${ LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER }/${ record.department }`, mountDataToEmit('added', { ...record, _id }));
}
emitEvent(LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER, mountDataToEmit('added', { ...record, _id }));
},
changed(_id, record) {
const isUpdatingDepartment = record && record.department;
const inquiry = LivechatInquiry.findOneById(_id);
if (inquiry && !inquiry.department) {
return emitEvent(LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER, mountDataToEmit('changed', inquiry));
}
if (isUpdatingDepartment) {
emitEvent(LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER, mountDataToEmit('changed', inquiry));
}
return emitEvent(`${ LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER }/${ inquiry.department }`, mountDataToEmit('changed', inquiry));
},
removed(_id) {
const inquiry = LivechatInquiry.trashFindOneById(_id);
if (inquiry && inquiry.department) {
return emitEvent(`${ LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER }/${ inquiry.department }`, mountDataToEmit('removed', { _id }));
}
emitEvent(LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER, mountDataToEmit('removed', { _id }));
},
LivechatInquiry.on('change', ({ clientAction, id: _id, data: record }) => {
switch (clientAction) {
case 'inserted':
emitQueueDataEvent(_id, { ...record, clientAction });
if (record && record.department) {
return emitQueueDataEvent(`department/${ record.department }`, mountDataToEmit('added', record));
}
emitQueueDataEvent('public', mountDataToEmit('added', record));
break;
case 'updated':
const isUpdatingDepartment = record && record.department;
const updatedRecord = LivechatInquiry.findOneById(_id);
emitQueueDataEvent(_id, { ...updatedRecord, clientAction });
if (updatedRecord && !updatedRecord.department) {
return emitQueueDataEvent('public', mountDataToEmit('changed', updatedRecord));
}
if (isUpdatingDepartment) {
emitQueueDataEvent('public', mountDataToEmit('changed', updatedRecord));
}
emitQueueDataEvent(`department/${ updatedRecord.department }`, mountDataToEmit('changed', updatedRecord));
break;
case 'removed':
const removedRecord = LivechatInquiry.trashFindOneById(_id);
emitQueueDataEvent(_id, { _id, clientAction });
if (removedRecord && removedRecord.department) {
return emitQueueDataEvent(`department/${ removedRecord.department }`, mountDataToEmit('removed', { _id }));
}
emitQueueDataEvent('public', mountDataToEmit('removed', { _id }));
break;
}
});

Loading…
Cancel
Save