From 43efe6d5912ffcdec43478774173003e2391ff7e Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Tue, 12 Apr 2016 21:04:58 -0300 Subject: [PATCH] Replace arunoda:streams by rocketchat:streamer (#2842) * Replace arunoda:streams by rocketchat:streamer * Fix event broadcast * Update version of rocketchat:streamer to 0.2.0 * Update rocketchat:streamer to version 0.3.0 * Update rocketchat:streamer to version 0.3.1 * Update rocketchat:streamer to 0.3.2 * Improve broadcast auth * Fix LiveChat Streamer --- .meteor/packages | 2 +- .meteor/versions | 2 +- packages/meteor-streams/LICENSE | 22 ---- packages/meteor-streams/README.md | 10 -- packages/meteor-streams/lib/client.js | 55 -------- packages/meteor-streams/lib/ev.js | 43 ------- packages/meteor-streams/lib/server.js | 118 ------------------ .../meteor-streams/lib/stream_permission.js | 47 ------- packages/meteor-streams/package.js | 13 -- .../rocketchat-assets/server/assets.coffee | 2 +- .../client/Notifications.coffee | 6 +- packages/rocketchat-lib/package.js | 2 +- .../server/functions/Notifications.coffee | 40 ++++-- .../rocketchat-livechat/app/.meteor/packages | 2 +- .../rocketchat-livechat/app/.meteor/versions | 2 +- .../app/client/lib/msgTyping.coffee | 2 +- .../app/client/startup/room.coffee | 2 +- packages/rocketchat-ui/lib/RoomManager.coffee | 2 +- server/stream/messages.coffee | 13 +- server/stream/streamBroadcast.coffee | 61 +++++---- 20 files changed, 75 insertions(+), 371 deletions(-) delete mode 100644 packages/meteor-streams/LICENSE delete mode 100644 packages/meteor-streams/README.md delete mode 100644 packages/meteor-streams/lib/client.js delete mode 100644 packages/meteor-streams/lib/ev.js delete mode 100644 packages/meteor-streams/lib/server.js delete mode 100644 packages/meteor-streams/lib/stream_permission.js delete mode 100644 packages/meteor-streams/package.js diff --git a/.meteor/packages b/.meteor/packages index 23f68b9e2f6..e43524f78eb 100644 --- a/.meteor/packages +++ b/.meteor/packages @@ -106,7 +106,7 @@ konecty:multiple-instances-status konecty:nrr konecty:user-presence -arunoda:streams +rocketchat:streamer chrismbeckett:toastr dispatch:run-as-user francocatena:status diff --git a/.meteor/versions b/.meteor/versions index d505723c573..894409ce031 100644 --- a/.meteor/versions +++ b/.meteor/versions @@ -7,7 +7,6 @@ accounts-oauth@1.1.8 accounts-password@1.1.4 accounts-twitter@1.0.6 aldeed:simple-schema@1.5.3 -arunoda:streams@0.1.17 autoupdate@1.2.4 babel-compiler@5.8.24_1 babel-runtime@0.1.4 @@ -180,6 +179,7 @@ rocketchat:slashcommands-leave@0.0.1 rocketchat:slashcommands-mute@0.0.1 rocketchat:spotify@0.0.1 rocketchat:statistics@0.0.1 +rocketchat:streamer@0.3.2 rocketchat:theme@0.0.1 rocketchat:tooltip@0.0.1 rocketchat:tutum@0.0.1 diff --git a/packages/meteor-streams/LICENSE b/packages/meteor-streams/LICENSE deleted file mode 100644 index 8a2cc4f283d..00000000000 --- a/packages/meteor-streams/LICENSE +++ /dev/null @@ -1,22 +0,0 @@ -(The MIT License) - -Copyright (c) 2013 Arunoda Susiripala - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -'Software'), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. -IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, -TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE -SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/packages/meteor-streams/README.md b/packages/meteor-streams/README.md deleted file mode 100644 index 5e6dac0ec2e..00000000000 --- a/packages/meteor-streams/README.md +++ /dev/null @@ -1,10 +0,0 @@ -# Meteor Streams [![Build Status](https://travis-ci.org/arunoda/meteor-streams.png?branch=master)](https://travis-ci.org/arunoda/meteor-streams) - -DB less realtime communication for meteor - -## Development Status -Project development status is [inactive](https://github.com/arunoda/meteor-streams/issues/21#issuecomment-59030380). - -## [Documentation](http://arunoda.github.io/meteor-streams/) - -[![Meteor Streams - DB less realtime communication for meteor](http://i.imgur.com/ZB3g3AK.png)](http://arunoda.github.io/meteor-streams/) diff --git a/packages/meteor-streams/lib/client.js b/packages/meteor-streams/lib/client.js deleted file mode 100644 index 6a65a182988..00000000000 --- a/packages/meteor-streams/lib/client.js +++ /dev/null @@ -1,55 +0,0 @@ -Meteor.Stream = function Stream(name, callback) { - EV.call(this); - - var self = this; - var streamName = 'stream-' + name; - var collection = new Meteor.Collection(streamName); - var subscription; - var subscriptionId; - - var connected = false; - var pendingEvents = []; - - self._emit = self.emit; - self._on = self.on; - - self.on = function on() { - self._on.apply(this, arguments); - var context = { subscriptionId: subscriptionId }; - self.emit.call(context, 'clear', arguments); - }; - - collection.find({}).observe({ - "added": function(item) { - if(item.type == 'subscriptionId') { - subscriptionId = item._id; - connected = true; - pendingEvents.forEach(function(args) { - self.emit.apply(self, args); - }); - pendingEvents = []; - } else { - var context = {}; - context.subscriptionId = item.subscriptionId; - context.userId = item.userId; - self._emit.apply(context, item.args); - } - } - }); - - subscription = Meteor.subscribe(streamName, callback); - - self.emit = function emit() { - if(connected) { - Meteor.call(streamName, subscriptionId, arguments); - } else { - pendingEvents.push(arguments); - } - }; - - self.close = function close() { - subscription.stop(); - }; -} - -_.extend(Meteor.Stream.prototype, EV.prototype); diff --git a/packages/meteor-streams/lib/ev.js b/packages/meteor-streams/lib/ev.js deleted file mode 100644 index b4f06e96d08..00000000000 --- a/packages/meteor-streams/lib/ev.js +++ /dev/null @@ -1,43 +0,0 @@ -function _EV() { - var self = this; - var handlers = {}; - - self.emit = function emit(event) { - var args = Array.prototype.slice.call(arguments, 1); - - if(handlers[event]) { - for(var lc=0; lc -1) - handlers[event].splice(index, 1); - } - }; - - self.removeAllListeners = function removeAllListeners(event) { - handlers[event] = undefined; - }; -} - -EV = _EV; diff --git a/packages/meteor-streams/lib/server.js b/packages/meteor-streams/lib/server.js deleted file mode 100644 index 95c56ce6517..00000000000 --- a/packages/meteor-streams/lib/server.js +++ /dev/null @@ -1,118 +0,0 @@ -var EventEmitter = Npm.require('events').EventEmitter; -var util = Npm.require('util'); -var Fibers = Npm.require('fibers'); - -Meteor.Stream = function Stream(name) { - EV.call(this); - - var self = this; - var streamName = 'stream-' + name; - var allowFunction; - var allowResultCache = true; - var allowResults = {}; - var filters = []; - - self.name = name; - - var events = new EventEmitter(); - events.setMaxListeners(0); - - var disconnectEvents = new EV(); - - self._emit = self.emit; - self.emit = function emit() { - self.emitToSubscriptions(arguments, null, null); - }; - - var defaultResult = (typeof(Package) == 'object' && Package.insecure)? true: Meteor.Collection.insecure === true; - self.permissions = new Meteor.Stream.Permission(defaultResult, true); - - self.addFilter = function addFilter(callback) { - filters.push(callback); - }; - - self.emitToSubscriptions = function emitToSubscriptions(args, subscriptionId, userId) { - events.emit('item', {args: args, userId: userId, subscriptionId: subscriptionId}); - }; - - Meteor.publish(streamName, function() { - check(arguments, Match.Any); - var subscriptionId = Random.id(); - var publication = this; - - //send subscription id as the first document - publication.added(streamName, subscriptionId, {type: 'subscriptionId'}); - publication.ready(); - events.on('item', onItem); - - function onItem(item) { - Fibers(function() { - var id = Random.id(); - if(self.permissions.checkPermission('read', subscriptionId, publication.userId, item.args)) { - //do not send again this to the sender - if(subscriptionId != item.subscriptionId) { - publication.added(streamName, id, item); - publication.removed(streamName, id); - } - } - }).run(); - } - - publication.onStop(function() { - //trigger related onDisconnect handlers if exists - Fibers(function() { - disconnectEvents.emit(subscriptionId); - disconnectEvents.removeAllListeners(subscriptionId); - }).run(); - events.removeListener('item', onItem); - }); - }); - - var methods = {}; - methods[streamName] = function(subscriptionId, args) { - check(arguments, Match.Any); - //in order to send this to the server callback - var userId = this.userId; - Fibers(function() { - var methodContext = {}; - methodContext.userId = userId; - methodContext.subscriptionId = subscriptionId; - - if (args[0] === 'clear') { - return self.permissions.clearCache(subscriptionId, args[1]); - } - - //in order to send this to the serve callback - methodContext.allowed = self.permissions.checkPermission('write', subscriptionId, methodContext.userId, args); - if(methodContext.allowed) { - //apply filters - args = applyFilters(args, methodContext); - self.emitToSubscriptions(args, subscriptionId, methodContext.userId); - //send to firehose if exists - if(self.firehose) { - self.firehose(args, subscriptionId, methodContext.userId); - } - } - //need to send this to server always - self._emit.apply(methodContext, args); - - //register onDisconnect handlers if provided - if(typeof(methodContext.onDisconnect) == 'function') { - disconnectEvents.on(subscriptionId, methodContext.onDisconnect) - } - - }).run(); - }; - Meteor.methods(methods); - - function applyFilters(args, context) { - var eventName = args.shift(); - filters.forEach(function(filter) { - args = filter.call(context, eventName, args); - }); - args.unshift(eventName); - return args; - } -}; - -util.inherits(Meteor.Stream, EV); \ No newline at end of file diff --git a/packages/meteor-streams/lib/stream_permission.js b/packages/meteor-streams/lib/stream_permission.js deleted file mode 100644 index 675fd331fe2..00000000000 --- a/packages/meteor-streams/lib/stream_permission.js +++ /dev/null @@ -1,47 +0,0 @@ -Meteor.Stream.Permission = function (acceptAll, cacheAll) { - var options = { - "read": { - results: {} - }, - "write": { - results: {} - } - }; - - this.clearCache = function(subscriptionId, args) { - var eventName = args[0]; - delete options['read'].results[subscriptionId + '-' + eventName]; - }; - - this.read = function(func, cache) { - options['read']['func'] = func; - options['read']['doCache'] = (cache === undefined)? cacheAll: cache; - }; - - this.write = function(func, cache) { - options['write']['func'] = func; - options['write']['doCache'] = (cache === undefined)? cacheAll: cache; - }; - - this.checkPermission = function(type, subscriptionId, userId, args) { - var eventName = args[0]; - var namespace = subscriptionId + '-' + eventName; - var result = options[type].results[namespace]; - - if(result === undefined) { - var func = options[type].func; - if(func) { - var context = {subscriptionId: subscriptionId, userId: userId}; - result = func.apply(context, args); - if(options[type].doCache) { - options[type].results[namespace] = result; - } - return result; - } else { - return acceptAll; - } - } else { - return result; - } - }; -} diff --git a/packages/meteor-streams/package.js b/packages/meteor-streams/package.js deleted file mode 100644 index 1f4b367cc05..00000000000 --- a/packages/meteor-streams/package.js +++ /dev/null @@ -1,13 +0,0 @@ -Package.describe({ - name: 'arunoda:streams', - version: '0.1.17', - summary: "DB less realtime communication for meteor" -}); - -Package.on_use(function (api, where) { - api.use('underscore', ['client', 'server']); - api.use('check'); - api.use('random'); - api.add_files(['lib/ev.js', 'lib/server.js', 'lib/stream_permission.js'], 'server'); - api.add_files(['lib/ev.js', 'lib/client.js'], 'client'); -}); diff --git a/packages/rocketchat-assets/server/assets.coffee b/packages/rocketchat-assets/server/assets.coffee index a801542c1ac..a6e68d6c7f3 100644 --- a/packages/rocketchat-assets/server/assets.coffee +++ b/packages/rocketchat-assets/server/assets.coffee @@ -132,7 +132,7 @@ for key, value of assets Meteor.startup -> forEachAsset = (key, value) -> RocketChat.settings.get "Assets_#{key}", (settingKey, settingValue) -> - if settingValue is undefined or not settingValue.url? + if not settingValue?.url? value.cache = undefined return diff --git a/packages/rocketchat-lib/client/Notifications.coffee b/packages/rocketchat-lib/client/Notifications.coffee index 3f8b61d971e..4a238c06c0a 100644 --- a/packages/rocketchat-lib/client/Notifications.coffee +++ b/packages/rocketchat-lib/client/Notifications.coffee @@ -1,9 +1,9 @@ RocketChat.Notifications = new class constructor: -> @debug = false - @streamAll = new Meteor.Stream 'notify-all' - @streamRoom = new Meteor.Stream 'notify-room' - @streamUser = new Meteor.Stream 'notify-user' + @streamAll = new Meteor.Streamer 'notify-all' + @streamRoom = new Meteor.Streamer 'notify-room' + @streamUser = new Meteor.Streamer 'notify-user' if @debug is true @onAll -> console.log "RocketChat.Notifications: onAll", arguments diff --git a/packages/rocketchat-lib/package.js b/packages/rocketchat-lib/package.js index b7d124e9487..ba265077f89 100644 --- a/packages/rocketchat-lib/package.js +++ b/packages/rocketchat-lib/package.js @@ -24,7 +24,7 @@ Package.onUse(function(api) { api.use('matb33:collection-hooks'); api.use('service-configuration'); api.use('check'); - api.use('arunoda:streams'); + api.use('rocketchat:streamer'); api.use('rocketchat:version'); api.use('rocketchat:logger'); diff --git a/packages/rocketchat-lib/server/functions/Notifications.coffee b/packages/rocketchat-lib/server/functions/Notifications.coffee index 3fb92a12935..514233745fa 100644 --- a/packages/rocketchat-lib/server/functions/Notifications.coffee +++ b/packages/rocketchat-lib/server/functions/Notifications.coffee @@ -4,16 +4,18 @@ RocketChat.Notifications = new class @debug = false - @streamAll = new Meteor.Stream 'notify-all' - @streamRoom = new Meteor.Stream 'notify-room' - @streamUser = new Meteor.Stream 'notify-user' + @streamAll = new Meteor.Streamer 'notify-all' + @streamRoom = new Meteor.Streamer 'notify-room' + @streamUser = new Meteor.Streamer 'notify-user' - @streamAll.permissions.write -> return false - @streamAll.permissions.read -> return @userId? + @streamAll.allowWrite('none') + @streamRoom.allowWrite('none') + @streamUser.allowWrite('logged') - @streamRoom.permissions.write -> return false - @streamRoom.permissions.read (eventName) -> + @streamAll.allowRead('logged') + + @streamRoom.allowRead (eventName) -> if not @userId? then return false roomId = eventName.split('/')[0] @@ -21,8 +23,7 @@ RocketChat.Notifications = new class user = Meteor.users.findOne @userId, {fields: {username: 1}} return RocketChat.models.Rooms.findOneByIdContainigUsername(roomId, user.username, {fields: {_id: 1}})? - @streamUser.permissions.write -> return @userId? - @streamUser.permissions.read (eventName) -> + @streamUser.allowRead (eventName) -> userId = eventName.split('/')[0] return @userId? and @userId is userId @@ -46,6 +47,25 @@ RocketChat.Notifications = new class @streamUser.emit.apply @streamUser, args + notifyAllInThisInstance: (eventName, args...) -> + console.log 'notifyAllAndBroadcast', arguments if @debug is true + + args.unshift eventName + @streamAll.emitWithoutBroadcast.apply @streamAll, args + + notifyRoomInThisInstance: (room, eventName, args...) -> + console.log 'notifyRoomAndBroadcast', arguments if @debug is true + + args.unshift "#{room}/#{eventName}" + @streamRoom.emitWithoutBroadcast.apply @streamRoom, args + + notifyUserInThisInstance: (userId, eventName, args...) -> + console.log 'notifyUserAndBroadcast', arguments if @debug is true + + args.unshift "#{userId}/#{eventName}" + @streamUser.emitWithoutBroadcast.apply @streamUser, args + + ## Permissions for client # Enable emit for event typing for rooms and add username to event data @@ -62,4 +82,4 @@ func = (eventName, username, typing) -> return false -RocketChat.Notifications.streamRoom.permissions.write func, false # Prevent Cache +RocketChat.Notifications.streamRoom.allowWrite func diff --git a/packages/rocketchat-livechat/app/.meteor/packages b/packages/rocketchat-livechat/app/.meteor/packages index 83352a667a8..82dbabc596c 100644 --- a/packages/rocketchat-livechat/app/.meteor/packages +++ b/packages/rocketchat-livechat/app/.meteor/packages @@ -23,7 +23,7 @@ jquery random ejson coffeescript -arunoda:streams +rocketchat:streamer kadira:flow-router kadira:blaze-layout konecty:nrr diff --git a/packages/rocketchat-livechat/app/.meteor/versions b/packages/rocketchat-livechat/app/.meteor/versions index 3105a3bc809..a47ce2930ee 100644 --- a/packages/rocketchat-livechat/app/.meteor/versions +++ b/packages/rocketchat-livechat/app/.meteor/versions @@ -1,7 +1,7 @@ accounts-base@1.2.2 accounts-password@1.1.4 aldeed:simple-schema@1.3.3 -arunoda:streams@0.1.17 +rocketchat:streamer@0.3.2 babel-compiler@5.8.24_1 babel-runtime@0.1.4 base64@1.0.4 diff --git a/packages/rocketchat-livechat/app/client/lib/msgTyping.coffee b/packages/rocketchat-livechat/app/client/lib/msgTyping.coffee index 2de805e49af..722e3e9b5a1 100644 --- a/packages/rocketchat-livechat/app/client/lib/msgTyping.coffee +++ b/packages/rocketchat-livechat/app/client/lib/msgTyping.coffee @@ -1,5 +1,5 @@ @MsgTyping = do -> - stream = new Meteor.Stream 'typing' + stream = new Meteor.Streamer 'typing' timeout = 15000 timeouts = {} renew = true diff --git a/packages/rocketchat-livechat/app/client/startup/room.coffee b/packages/rocketchat-livechat/app/client/startup/room.coffee index cbed0d65ef5..bba1c40ab65 100644 --- a/packages/rocketchat-livechat/app/client/startup/room.coffee +++ b/packages/rocketchat-livechat/app/client/startup/room.coffee @@ -1,4 +1,4 @@ -msgStream = new Meteor.Stream 'messages' +msgStream = new Meteor.Streamer 'messages' Tracker.autorun -> if visitor.getRoom()? msgStream.on visitor.getRoom(), (msg) -> diff --git a/packages/rocketchat-ui/lib/RoomManager.coffee b/packages/rocketchat-ui/lib/RoomManager.coffee index 524d7bd7ef7..45b01d0d843 100644 --- a/packages/rocketchat-ui/lib/RoomManager.coffee +++ b/packages/rocketchat-ui/lib/RoomManager.coffee @@ -48,7 +48,7 @@ RocketChat.Notifications.onUser 'message', (msg) -> @RoomManager = new class openedRooms = {} subscription = null - msgStream = new Meteor.Stream 'messages' + msgStream = new Meteor.Streamer 'messages' onlineUsers = new ReactiveVar {} Dep = new Tracker.Dependency diff --git a/server/stream/messages.coffee b/server/stream/messages.coffee index 0dc2cb4d16d..895c0154dd8 100644 --- a/server/stream/messages.coffee +++ b/server/stream/messages.coffee @@ -1,11 +1,8 @@ -@msgStream = new Meteor.Stream 'messages' +@msgStream = new Meteor.Streamer 'messages' -msgStream.permissions.write (eventName) -> - # console.log('stream.permissions.write', this.userId); - # return eventName == 'send' && this.userId; - return false +msgStream.allowWrite('none') -msgStream.permissions.read (eventName) -> +msgStream.allowRead (eventName) -> # console.log('stream.permissions.read', this.userId, eventName); # return this.userId == eventName; @@ -27,7 +24,7 @@ Meteor.startup -> RocketChat.models.Messages.findVisibleCreatedOrEditedAfterTimestamp(new Date(), options).observe added: (record) -> - msgStream.emit record.rid, record + msgStream.emitWithoutBroadcast record.rid, record changed: (record) -> - msgStream.emit record.rid, record + msgStream.emitWithoutBroadcast record.rid, record diff --git a/server/stream/streamBroadcast.coffee b/server/stream/streamBroadcast.coffee index 73887068029..46c3aa7db44 100644 --- a/server/stream/streamBroadcast.coffee +++ b/server/stream/streamBroadcast.coffee @@ -4,29 +4,36 @@ logger = new Logger 'StreamBroadcast', auth: 'Auth' stream: 'Stream' - -authorizeConnection = (connection, record) -> - logger.auth.info "Authorizing with localhost:#{record.extraInformation.port}" - connection.call 'broadcastAuth', record._id, InstanceStatus.id(), (err, ok) -> +authorizeConnection = (connection) -> + logger.auth.info "Authorizing with localhost:#{connection.instanceRecord.extraInformation.port}" + connection.call 'broadcastAuth', connection.instanceRecord._id, InstanceStatus.id(), (err, ok) -> connection.broadcastAuth = ok - logger.auth.info "broadcastAuth with localhost:#{record.extraInformation.port}", ok + logger.auth.info "broadcastAuth with localhost:#{connection.instanceRecord.extraInformation.port}", ok @connections = {} -@startStreamBroadcast = (streams) -> +@startStreamBroadcast = () -> logger.info 'startStreamBroadcast' # connections = {} InstanceStatus.getCollection().find({'extraInformation.port': {$exists: true}}, {sort: {_createdAt: -1}}).observe added: (record) -> - if record.extraInformation.port is process.env.PORT or connections[record.extraInformation.port]? + if record.extraInformation.port is process.env.PORT return + if connections[record.extraInformation.port]?.instanceRecord? + if connections[record.extraInformation.port].instanceRecord._createdAt < record._createdAt + connections[record.extraInformation.port].disconnect() + delete connections[record.extraInformation.port] + else + return + logger.connection.info 'connecting in', "localhost:#{record.extraInformation.port}" connections[record.extraInformation.port] = DDP.connect("localhost:#{record.extraInformation.port}", {_dontPrintErrors: true}) - authorizeConnection(connections[record.extraInformation.port], record); + connections[record.extraInformation.port].instanceRecord = record; + authorizeConnection(connections[record.extraInformation.port]); connections[record.extraInformation.port].onReconnect = -> - authorizeConnection(connections[record.extraInformation.port], record); + authorizeConnection(connections[record.extraInformation.port]); removed: (record) -> if connections[record.extraInformation.port]? and not InstanceStatus.getCollection().findOne({'extraInformation.port': record.extraInformation.port})? @@ -34,11 +41,11 @@ authorizeConnection = (connection, record) -> connections[record.extraInformation.port].disconnect() delete connections[record.extraInformation.port] - broadcast = (streamName, args, userId) -> + broadcast = (streamName, eventName, args, userId) -> for port, connection of connections do (port, connection) -> if connection.status().connected is true - connection.call 'stream', streamName, args, (error, response) -> + connection.call 'stream', streamName, eventName, args, (error, response) -> if error? logger.error "Stream broadcast error", error @@ -47,19 +54,20 @@ authorizeConnection = (connection, record) -> logger.stream.error "Stream broadcast from:#{process.env.PORT} to:#{connection._stream.endpoint} with name #{streamName} to self is not authorized".red logger.stream.debug " -> connection authorized".red, connection.broadcastAuth logger.stream.debug " -> connection status".red, connection.status() - logger.stream.debug " -> arguments".red, args + logger.stream.debug " -> arguments".red, eventName, args when 'not-authorized' logger.stream.error "Stream broadcast from:#{process.env.PORT} to:#{connection._stream.endpoint} with name #{streamName} not authorized".red logger.stream.debug " -> connection authorized".red, connection.broadcastAuth logger.stream.debug " -> connection status".red, connection.status() - logger.stream.debug " -> arguments".red, args + logger.stream.debug " -> arguments".red, eventName, args + authorizeConnection(connection); when 'stream-not-exists' logger.stream.error "Stream broadcast from:#{process.env.PORT} to:#{connection._stream.endpoint} with name #{streamName} does not exists".red logger.stream.debug " -> connection authorized".red, connection.broadcastAuth logger.stream.debug " -> connection status".red, connection.status() - logger.stream.debug " -> arguments".red, args + logger.stream.debug " -> arguments".red, eventName, args Meteor.methods @@ -71,16 +79,8 @@ authorizeConnection = (connection, record) -> broadcastAuth: connection.broadcastAuth return data - emitters = {} - - for streamName, stream of streams - do (streamName, stream) -> - emitters[streamName] = stream.emitToSubscriptions - stream.emitToSubscriptions = (args, subscriptionId, userId) -> - if subscriptionId isnt 'broadcasted' - broadcast streamName, args - - emitters[streamName] args, subscriptionId, userId + Meteor.StreamerCentral.on 'broadcast', (streamName, eventName, args) -> + broadcast streamName, eventName, args Meteor.methods broadcastAuth: (selfId, remoteId) -> @@ -93,7 +93,7 @@ authorizeConnection = (connection, record) -> return @connection.broadcastAuth is true - stream: (streamName, args) -> + stream: (streamName, eventName, args) -> # Prevent call from self and client if not @connection? return 'self-not-authorized' @@ -102,18 +102,13 @@ authorizeConnection = (connection, record) -> if @connection.broadcastAuth isnt true return 'not-authorized' - if not emitters[streamName]? + if not Meteor.StreamerCentral.instances[streamName]? return 'stream-not-exists' - emitters[streamName].call null, args, 'broadcasted' + Meteor.StreamerCentral.instances[streamName]._emit(eventName, args) return undefined Meteor.startup -> - config = - 'RocketChat.Notifications.streamAll': RocketChat.Notifications.streamAll - 'RocketChat.Notifications.streamRoom': RocketChat.Notifications.streamRoom - 'RocketChat.Notifications.streamUser': RocketChat.Notifications.streamUser - - startStreamBroadcast config + startStreamBroadcast()