mirror of https://github.com/watcha-fr/synapse
Conflicts: synapse/handlers/room.py synapse/storage/stream.pypull/4/merge
commit
2aeaa7b77c
@ -1,196 +0,0 @@ |
||||
# -*- coding: utf-8 -*- |
||||
# Copyright 2014 matrix.org |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
from synapse.api.constants import Membership |
||||
from synapse.api.events.room import RoomMemberEvent |
||||
from synapse.api.streams.event import EventsStreamData |
||||
|
||||
from twisted.internet import defer |
||||
from twisted.internet import reactor |
||||
|
||||
import logging |
||||
|
||||
logger = logging.getLogger(__name__) |
||||
|
||||
|
||||
class Notifier(object): |
||||
|
||||
def __init__(self, hs): |
||||
self.store = hs.get_datastore() |
||||
self.hs = hs |
||||
self.stored_event_listeners = {} |
||||
|
||||
@defer.inlineCallbacks |
||||
def on_new_room_event(self, event, store_id): |
||||
"""Called when there is a new room event which may potentially be sent |
||||
down listening users' event streams. |
||||
|
||||
This function looks for interested *users* who may want to be notified |
||||
for this event. This is different to users requesting from the event |
||||
stream which looks for interested *events* for this user. |
||||
|
||||
Args: |
||||
event (SynapseEvent): The new event, which must have a room_id |
||||
store_id (int): The ID of this event after it was stored with the |
||||
data store. |
||||
'""" |
||||
member_list = yield self.store.get_room_members(room_id=event.room_id, |
||||
membership="join") |
||||
if not member_list: |
||||
member_list = [] |
||||
|
||||
member_list = [u.user_id for u in member_list] |
||||
|
||||
# invites MUST prod the person being invited, who won't be in the room. |
||||
if (event.type == RoomMemberEvent.TYPE and |
||||
event.content["membership"] == Membership.INVITE): |
||||
member_list.append(event.state_key) |
||||
# similarly, LEAVEs must be sent to the person leaving |
||||
if (event.type == RoomMemberEvent.TYPE and |
||||
event.content["membership"] == Membership.LEAVE): |
||||
member_list.append(event.state_key) |
||||
|
||||
for user_id in member_list: |
||||
if user_id in self.stored_event_listeners: |
||||
self._notify_and_callback( |
||||
user_id=user_id, |
||||
event_data=event.get_dict(), |
||||
stream_type=EventsStreamData.EVENT_TYPE, |
||||
store_id=store_id) |
||||
|
||||
def on_new_user_event(self, user_id, event_data, stream_type, store_id): |
||||
if user_id in self.stored_event_listeners: |
||||
self._notify_and_callback( |
||||
user_id=user_id, |
||||
event_data=event_data, |
||||
stream_type=stream_type, |
||||
store_id=store_id |
||||
) |
||||
|
||||
def _notify_and_callback(self, user_id, event_data, stream_type, store_id): |
||||
logger.debug( |
||||
"Notifying %s of a new event.", |
||||
user_id |
||||
) |
||||
|
||||
stream_ids = list(self.stored_event_listeners[user_id]) |
||||
for stream_id in stream_ids: |
||||
self._notify_and_callback_stream(user_id, stream_id, event_data, |
||||
stream_type, store_id) |
||||
|
||||
if not self.stored_event_listeners[user_id]: |
||||
del self.stored_event_listeners[user_id] |
||||
|
||||
def _notify_and_callback_stream(self, user_id, stream_id, event_data, |
||||
stream_type, store_id): |
||||
|
||||
event_listener = self.stored_event_listeners[user_id].pop(stream_id) |
||||
return_event_object = { |
||||
k: event_listener[k] for k in ["start", "chunk", "end"] |
||||
} |
||||
|
||||
# work out the new end token |
||||
token = event_listener["start"] |
||||
end = self._next_token(stream_type, store_id, token) |
||||
return_event_object["end"] = end |
||||
|
||||
# add the event to the chunk |
||||
chunk = event_listener["chunk"] |
||||
chunk.append(event_data) |
||||
|
||||
# callback the defer. We know this can't have been resolved before as |
||||
# we always remove the event_listener from the map before resolving. |
||||
event_listener["defer"].callback(return_event_object) |
||||
|
||||
def _next_token(self, stream_type, store_id, current_token): |
||||
stream_handler = self.hs.get_handlers().event_stream_handler |
||||
return stream_handler.get_event_stream_token( |
||||
stream_type, |
||||
store_id, |
||||
current_token |
||||
) |
||||
|
||||
def store_events_for(self, user_id=None, stream_id=None, from_tok=None): |
||||
"""Store all incoming events for this user. This should be paired with |
||||
get_events_for to return chunked data. |
||||
|
||||
Args: |
||||
user_id (str): The user to monitor incoming events for. |
||||
stream (object): The stream that is receiving events |
||||
from_tok (str): The token to monitor incoming events from. |
||||
""" |
||||
event_listener = { |
||||
"start": from_tok, |
||||
"chunk": [], |
||||
"end": from_tok, |
||||
"defer": defer.Deferred(), |
||||
} |
||||
|
||||
if user_id not in self.stored_event_listeners: |
||||
self.stored_event_listeners[user_id] = {stream_id: event_listener} |
||||
else: |
||||
self.stored_event_listeners[user_id][stream_id] = event_listener |
||||
|
||||
def purge_events_for(self, user_id=None, stream_id=None): |
||||
"""Purges any stored events for this user. |
||||
|
||||
Args: |
||||
user_id (str): The user to purge stored events for. |
||||
""" |
||||
try: |
||||
del self.stored_event_listeners[user_id][stream_id] |
||||
if not self.stored_event_listeners[user_id]: |
||||
del self.stored_event_listeners[user_id] |
||||
except KeyError: |
||||
pass |
||||
|
||||
def get_events_for(self, user_id=None, stream_id=None, timeout=0): |
||||
"""Retrieve stored events for this user, waiting if necessary. |
||||
|
||||
It is advisable to wrap this call in a maybeDeferred. |
||||
|
||||
Args: |
||||
user_id (str): The user to get events for. |
||||
timeout (int): The time in seconds to wait before giving up. |
||||
Returns: |
||||
A Deferred or a dict containing the chunk data, depending on if |
||||
there was data to return yet. The Deferred callback may be None if |
||||
there were no events before the timeout expired. |
||||
""" |
||||
logger.debug("%s is listening for events.", user_id) |
||||
|
||||
try: |
||||
streams = self.stored_event_listeners[user_id][stream_id]["chunk"] |
||||
if streams: |
||||
logger.debug("%s returning existing chunk.", user_id) |
||||
return streams |
||||
except KeyError: |
||||
return None |
||||
|
||||
reactor.callLater( |
||||
(timeout / 1000.0), self._timeout, user_id, stream_id |
||||
) |
||||
return self.stored_event_listeners[user_id][stream_id]["defer"] |
||||
|
||||
def _timeout(self, user_id, stream_id): |
||||
try: |
||||
# We remove the event_listener from the map so that we can't |
||||
# resolve the deferred twice. |
||||
event_listeners = self.stored_event_listeners[user_id] |
||||
event_listener = event_listeners.pop(stream_id) |
||||
event_listener["defer"].callback(None) |
||||
logger.debug("%s event listening timed out.", user_id) |
||||
except KeyError: |
||||
pass |
@ -1,103 +0,0 @@ |
||||
# -*- coding: utf-8 -*- |
||||
# Copyright 2014 matrix.org |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
from synapse.api.errors import SynapseError |
||||
|
||||
|
||||
class PaginationConfig(object): |
||||
|
||||
"""A configuration object which stores pagination parameters.""" |
||||
|
||||
def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0): |
||||
self.from_tok = from_tok |
||||
self.to_tok = to_tok |
||||
self.direction = direction |
||||
self.limit = limit |
||||
|
||||
@classmethod |
||||
def from_request(cls, request, raise_invalid_params=True): |
||||
params = { |
||||
"from_tok": "END", |
||||
"direction": 'f', |
||||
} |
||||
|
||||
query_param_mappings = [ # 3-tuple of qp_key, attribute, rules |
||||
("from", "from_tok", lambda x: type(x) == str), |
||||
("to", "to_tok", lambda x: type(x) == str), |
||||
("limit", "limit", lambda x: x.isdigit()), |
||||
("dir", "direction", lambda x: x == 'f' or x == 'b'), |
||||
] |
||||
|
||||
for qp, attr, is_valid in query_param_mappings: |
||||
if qp in request.args: |
||||
if is_valid(request.args[qp][0]): |
||||
params[attr] = request.args[qp][0] |
||||
elif raise_invalid_params: |
||||
raise SynapseError(400, "%s parameter is invalid." % qp) |
||||
|
||||
return PaginationConfig(**params) |
||||
|
||||
def __str__(self): |
||||
return ( |
||||
"<PaginationConfig from_tok=%s, to_tok=%s, " |
||||
"direction=%s, limit=%s>" |
||||
) % (self.from_tok, self.to_tok, self.direction, self.limit) |
||||
|
||||
|
||||
class PaginationStream(object): |
||||
|
||||
""" An interface for streaming data as chunks. """ |
||||
|
||||
TOK_END = "END" |
||||
|
||||
def get_chunk(self, config=None): |
||||
""" Return the next chunk in the stream. |
||||
|
||||
Args: |
||||
config (PaginationConfig): The config to aid which chunk to get. |
||||
Returns: |
||||
A dict containing the new start token "start", the new end token |
||||
"end" and the data "chunk" as a list. |
||||
""" |
||||
raise NotImplementedError() |
||||
|
||||
|
||||
class StreamData(object): |
||||
|
||||
""" An interface for obtaining streaming data from a table. """ |
||||
|
||||
def __init__(self, hs): |
||||
self.hs = hs |
||||
self.store = hs.get_datastore() |
||||
|
||||
def get_rows(self, user_id, from_pkey, to_pkey, limit, direction): |
||||
""" Get event stream data between the specified pkeys. |
||||
|
||||
Args: |
||||
user_id : The user's ID |
||||
from_pkey : The starting pkey. |
||||
to_pkey : The end pkey. May be -1 to mean "latest". |
||||
limit: The max number of results to return. |
||||
Returns: |
||||
A tuple containing the list of event stream data and the last pkey. |
||||
""" |
||||
raise NotImplementedError() |
||||
|
||||
def max_token(self): |
||||
""" Get the latest currently-valid token. |
||||
|
||||
Returns: |
||||
The latest token.""" |
||||
raise NotImplementedError() |
@ -1,194 +0,0 @@ |
||||
# -*- coding: utf-8 -*- |
||||
# Copyright 2014 matrix.org |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
"""This module contains classes for streaming from the event stream: /events. |
||||
""" |
||||
from twisted.internet import defer |
||||
|
||||
from synapse.api.errors import EventStreamError |
||||
from synapse.api.events import SynapseEvent |
||||
from synapse.api.streams import PaginationStream, StreamData |
||||
|
||||
import logging |
||||
|
||||
logger = logging.getLogger(__name__) |
||||
|
||||
|
||||
class EventsStreamData(StreamData): |
||||
EVENT_TYPE = "EventsStream" |
||||
|
||||
def __init__(self, hs, room_id=None, feedback=False): |
||||
super(EventsStreamData, self).__init__(hs) |
||||
self.room_id = room_id |
||||
self.with_feedback = feedback |
||||
|
||||
@defer.inlineCallbacks |
||||
def get_rows(self, user_id, from_key, to_key, limit, direction): |
||||
data, latest_ver = yield self.store.get_room_events( |
||||
user_id=user_id, |
||||
from_key=from_key, |
||||
to_key=to_key, |
||||
limit=limit, |
||||
room_id=self.room_id, |
||||
with_feedback=self.with_feedback |
||||
) |
||||
defer.returnValue((data, latest_ver)) |
||||
|
||||
@defer.inlineCallbacks |
||||
def max_token(self): |
||||
val = yield self.store.get_room_events_max_id() |
||||
defer.returnValue(val) |
||||
|
||||
|
||||
class EventStream(PaginationStream): |
||||
|
||||
SEPARATOR = '_' |
||||
|
||||
def __init__(self, user_id, stream_data_list): |
||||
super(EventStream, self).__init__() |
||||
self.user_id = user_id |
||||
self.stream_data = stream_data_list |
||||
|
||||
@defer.inlineCallbacks |
||||
def fix_tokens(self, pagination_config): |
||||
pagination_config.from_tok = yield self.fix_token( |
||||
pagination_config.from_tok) |
||||
pagination_config.to_tok = yield self.fix_token( |
||||
pagination_config.to_tok) |
||||
|
||||
if ( |
||||
not pagination_config.to_tok |
||||
and pagination_config.direction == 'f' |
||||
): |
||||
pagination_config.to_tok = yield self.get_current_max_token() |
||||
|
||||
logger.debug("pagination_config: %s", pagination_config) |
||||
|
||||
defer.returnValue(pagination_config) |
||||
|
||||
@defer.inlineCallbacks |
||||
def fix_token(self, token): |
||||
"""Fixes unknown values in a token to known values. |
||||
|
||||
Args: |
||||
token (str): The token to fix up. |
||||
Returns: |
||||
The fixed-up token, which may == token. |
||||
""" |
||||
if token == PaginationStream.TOK_END: |
||||
new_token = yield self.get_current_max_token() |
||||
|
||||
logger.debug("fix_token: From %s to %s", token, new_token) |
||||
|
||||
token = new_token |
||||
|
||||
defer.returnValue(token) |
||||
|
||||
@defer.inlineCallbacks |
||||
def get_current_max_token(self): |
||||
new_token_parts = [] |
||||
for s in self.stream_data: |
||||
mx = yield s.max_token() |
||||
new_token_parts.append(str(mx)) |
||||
|
||||
new_token = EventStream.SEPARATOR.join(new_token_parts) |
||||
|
||||
logger.debug("get_current_max_token: %s", new_token) |
||||
|
||||
defer.returnValue(new_token) |
||||
|
||||
@defer.inlineCallbacks |
||||
def get_chunk(self, config): |
||||
# no support for limit on >1 streams, makes no sense. |
||||
if config.limit and len(self.stream_data) > 1: |
||||
raise EventStreamError( |
||||
400, "Limit not supported on multiplexed streams." |
||||
) |
||||
|
||||
chunk_data, next_tok = yield self._get_chunk_data( |
||||
config.from_tok, |
||||
config.to_tok, |
||||
config.limit, |
||||
config.direction, |
||||
) |
||||
|
||||
defer.returnValue({ |
||||
"chunk": chunk_data, |
||||
"start": config.from_tok, |
||||
"end": next_tok |
||||
}) |
||||
|
||||
@defer.inlineCallbacks |
||||
def _get_chunk_data(self, from_tok, to_tok, limit, direction): |
||||
""" Get event data between the two tokens. |
||||
|
||||
Tokens are SEPARATOR separated values representing pkey values of |
||||
certain tables, and the position determines the StreamData invoked |
||||
according to the STREAM_DATA list. |
||||
|
||||
The magic value '-1' can be used to get the latest value. |
||||
|
||||
Args: |
||||
from_tok - The token to start from. |
||||
to_tok - The token to end at. Must have values > from_tok or be -1. |
||||
Returns: |
||||
A list of event data. |
||||
Raises: |
||||
EventStreamError if something went wrong. |
||||
""" |
||||
# sanity check |
||||
if to_tok is not None: |
||||
if (from_tok.count(EventStream.SEPARATOR) != |
||||
to_tok.count(EventStream.SEPARATOR) or |
||||
(from_tok.count(EventStream.SEPARATOR) + 1) != |
||||
len(self.stream_data)): |
||||
raise EventStreamError(400, "Token lengths don't match.") |
||||
|
||||
chunk = [] |
||||
next_ver = [] |
||||
for i, (from_pkey, to_pkey) in enumerate(zip( |
||||
self._split_token(from_tok), |
||||
self._split_token(to_tok) |
||||
)): |
||||
if from_pkey == to_pkey: |
||||
# tokens are the same, we have nothing to do. |
||||
next_ver.append(str(to_pkey)) |
||||
continue |
||||
|
||||
(event_chunk, max_pkey) = yield self.stream_data[i].get_rows( |
||||
self.user_id, from_pkey, to_pkey, limit, direction, |
||||
) |
||||
|
||||
chunk.extend([ |
||||
e.get_dict() if isinstance(e, SynapseEvent) else e |
||||
for e in event_chunk |
||||
]) |
||||
next_ver.append(str(max_pkey)) |
||||
|
||||
defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver))) |
||||
|
||||
def _split_token(self, token): |
||||
"""Splits the given token into a list of pkeys. |
||||
|
||||
Args: |
||||
token (str): The token with SEPARATOR values. |
||||
Returns: |
||||
A list of ints. |
||||
""" |
||||
if token: |
||||
segments = token.split(EventStream.SEPARATOR) |
||||
else: |
||||
segments = [None] * len(self.stream_data) |
||||
return segments |
@ -0,0 +1,236 @@ |
||||
# -*- coding: utf-8 -*- |
||||
# Copyright 2014 matrix.org |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
from twisted.internet import defer, reactor |
||||
|
||||
from synapse.util.logutils import log_function |
||||
|
||||
import logging |
||||
|
||||
|
||||
logger = logging.getLogger(__name__) |
||||
|
||||
|
||||
class _NotificationListener(object): |
||||
""" This represents a single client connection to the events stream. |
||||
|
||||
The events stream handler will have yielded to the deferred, so to |
||||
notify the handler it is sufficient to resolve the deferred. |
||||
|
||||
This listener will also keep track of which rooms it is listening in |
||||
so that it can remove itself from the indexes in the Notifier class. |
||||
""" |
||||
|
||||
def __init__(self, user, rooms, from_token, limit, timeout, deferred): |
||||
self.user = user |
||||
self.from_token = from_token |
||||
self.limit = limit |
||||
self.timeout = timeout |
||||
self.deferred = deferred |
||||
|
||||
self.rooms = rooms |
||||
|
||||
self.pending_notifications = [] |
||||
|
||||
def notify(self, notifier, events, start_token, end_token): |
||||
""" Inform whoever is listening about the new events. This will |
||||
also remove this listener from all the indexes in the Notifier |
||||
it knows about. |
||||
""" |
||||
|
||||
result = (events, (start_token, end_token)) |
||||
|
||||
try: |
||||
self.deferred.callback(result) |
||||
except defer.AlreadyCalledError: |
||||
pass |
||||
|
||||
for room in self.rooms: |
||||
lst = notifier.rooms_to_listeners.get(room, set()) |
||||
lst.discard(self) |
||||
|
||||
notifier.user_to_listeners.get(self.user, set()).discard(self) |
||||
|
||||
|
||||
class Notifier(object): |
||||
""" This class is responsible for notifying any listeners when there are |
||||
new events available for it. |
||||
|
||||
Primarily used from the /events stream. |
||||
""" |
||||
|
||||
def __init__(self, hs): |
||||
self.hs = hs |
||||
|
||||
self.rooms_to_listeners = {} |
||||
self.user_to_listeners = {} |
||||
|
||||
self.event_sources = hs.get_event_sources() |
||||
|
||||
hs.get_distributor().observe( |
||||
"user_joined_room", self._user_joined_room |
||||
) |
||||
|
||||
@log_function |
||||
@defer.inlineCallbacks |
||||
def on_new_room_event(self, event, extra_users=[]): |
||||
""" Used by handlers to inform the notifier something has happened |
||||
in the room, room event wise. |
||||
|
||||
This triggers the notifier to wake up any listeners that are |
||||
listening to the room, and any listeners for the users in the |
||||
`extra_users` param. |
||||
""" |
||||
room_id = event.room_id |
||||
|
||||
source = self.event_sources.sources["room"] |
||||
|
||||
listeners = self.rooms_to_listeners.get(room_id, set()).copy() |
||||
|
||||
for user in extra_users: |
||||
listeners |= self.user_to_listeners.get(user, set()).copy() |
||||
|
||||
logger.debug("on_new_room_event listeners %s", listeners) |
||||
|
||||
# TODO (erikj): Can we make this more efficient by hitting the |
||||
# db once? |
||||
for listener in listeners: |
||||
events, end_token = yield source.get_new_events_for_user( |
||||
listener.user, |
||||
listener.from_token, |
||||
listener.limit, |
||||
) |
||||
|
||||
if events: |
||||
listener.notify( |
||||
self, events, listener.from_token, end_token |
||||
) |
||||
|
||||
@defer.inlineCallbacks |
||||
def on_new_user_event(self, users=[], rooms=[]): |
||||
""" Used to inform listeners that something has happend |
||||
presence/user event wise. |
||||
|
||||
Will wake up all listeners for the given users and rooms. |
||||
""" |
||||
source = self.event_sources.sources["presence"] |
||||
|
||||
listeners = set() |
||||
|
||||
for user in users: |
||||
listeners |= self.user_to_listeners.get(user, set()).copy() |
||||
|
||||
for room in rooms: |
||||
listeners |= self.rooms_to_listeners.get(room, set()).copy() |
||||
|
||||
for listener in listeners: |
||||
events, end_token = yield source.get_new_events_for_user( |
||||
listener.user, |
||||
listener.from_token, |
||||
listener.limit, |
||||
) |
||||
|
||||
if events: |
||||
listener.notify( |
||||
self, events, listener.from_token, end_token |
||||
) |
||||
|
||||
def get_events_for(self, user, rooms, pagination_config, timeout): |
||||
""" For the given user and rooms, return any new events for them. If |
||||
there are no new events wait for up to `timeout` milliseconds for any |
||||
new events to happen before returning. |
||||
""" |
||||
deferred = defer.Deferred() |
||||
|
||||
self._get_events( |
||||
deferred, user, rooms, pagination_config.from_token, |
||||
pagination_config.limit, timeout |
||||
).addErrback(deferred.errback) |
||||
|
||||
return deferred |
||||
|
||||
@defer.inlineCallbacks |
||||
def _get_events(self, deferred, user, rooms, from_token, limit, timeout): |
||||
if not from_token: |
||||
from_token = yield self.event_sources.get_current_token() |
||||
|
||||
listener = _NotificationListener( |
||||
user, |
||||
rooms, |
||||
from_token, |
||||
limit, |
||||
timeout, |
||||
deferred, |
||||
) |
||||
|
||||
if timeout: |
||||
reactor.callLater(timeout/1000, self._timeout_listener, listener) |
||||
|
||||
self._register_with_keys(listener) |
||||
yield self._check_for_updates(listener) |
||||
|
||||
return |
||||
|
||||
def _timeout_listener(self, listener): |
||||
# TODO (erikj): We should probably set to_token to the current max |
||||
# rather than reusing from_token. |
||||
listener.notify( |
||||
self, |
||||
[], |
||||
listener.from_token, |
||||
listener.from_token, |
||||
) |
||||
|
||||
@log_function |
||||
def _register_with_keys(self, listener): |
||||
for room in listener.rooms: |
||||
s = self.rooms_to_listeners.setdefault(room, set()) |
||||
s.add(listener) |
||||
|
||||
self.user_to_listeners.setdefault(listener.user, set()).add(listener) |
||||
|
||||
@defer.inlineCallbacks |
||||
@log_function |
||||
def _check_for_updates(self, listener): |
||||
# TODO (erikj): We need to think about limits across multiple sources |
||||
events = [] |
||||
|
||||
from_token = listener.from_token |
||||
limit = listener.limit |
||||
|
||||
# TODO (erikj): DeferredList? |
||||
for source in self.event_sources.sources.values(): |
||||
stuff, new_token = yield source.get_new_events_for_user( |
||||
listener.user, |
||||
from_token, |
||||
limit, |
||||
) |
||||
|
||||
events.extend(stuff) |
||||
|
||||
from_token = new_token |
||||
|
||||
end_token = from_token |
||||
|
||||
if events: |
||||
listener.notify(self, events, listener.from_token, end_token) |
||||
|
||||
defer.returnValue(listener) |
||||
|
||||
def _user_joined_room(self, user, room_id): |
||||
new_listeners = self.user_to_listeners.get(user, set()) |
||||
|
||||
listeners = self.rooms_to_listeners.setdefault(room_id, set()) |
||||
listeners |= new_listeners |
@ -0,0 +1,14 @@ |
||||
# -*- coding: utf-8 -*- |
||||
# Copyright 2014 matrix.org |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
@ -0,0 +1,84 @@ |
||||
# -*- coding: utf-8 -*- |
||||
# Copyright 2014 matrix.org |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
from synapse.api.errors import SynapseError |
||||
from synapse.types import StreamToken |
||||
|
||||
import logging |
||||
|
||||
|
||||
logger = logging.getLogger(__name__) |
||||
|
||||
|
||||
class PaginationConfig(object): |
||||
|
||||
"""A configuration object which stores pagination parameters.""" |
||||
|
||||
def __init__(self, from_token=None, to_token=None, direction='f', |
||||
limit=0): |
||||
self.from_token = from_token |
||||
self.to_token = to_token |
||||
self.direction = 'f' if direction == 'f' else 'b' |
||||
self.limit = int(limit) |
||||
|
||||
@classmethod |
||||
def from_request(cls, request, raise_invalid_params=True): |
||||
def get_param(name, default=None): |
||||
lst = request.args.get(name, []) |
||||
if len(lst) > 1: |
||||
raise SynapseError( |
||||
400, "%s must be specified only once" % (name,) |
||||
) |
||||
elif len(lst) == 1: |
||||
return lst[0] |
||||
else: |
||||
return default |
||||
|
||||
direction = get_param("dir", 'f') |
||||
if direction not in ['f', 'b']: |
||||
raise SynapseError(400, "'dir' parameter is invalid.") |
||||
|
||||
from_tok = get_param("from") |
||||
to_tok = get_param("to") |
||||
|
||||
try: |
||||
if from_tok == "END": |
||||
from_tok = None # For backwards compat. |
||||
elif from_tok: |
||||
from_tok = StreamToken.from_string(from_tok) |
||||
except: |
||||
raise SynapseError(400, "'from' paramater is invalid") |
||||
|
||||
try: |
||||
if to_tok: |
||||
to_tok = StreamToken.from_string(to_tok) |
||||
except: |
||||
raise SynapseError(400, "'to' paramater is invalid") |
||||
|
||||
limit = get_param("limit", "0") |
||||
if not limit.isdigit(): |
||||
raise SynapseError(400, "'limit' parameter must be an integer.") |
||||
|
||||
try: |
||||
return PaginationConfig(from_tok, to_tok, direction, limit) |
||||
except: |
||||
logger.exception("Failed to create pagination config") |
||||
raise SynapseError(400, "Invalid request.") |
||||
|
||||
def __str__(self): |
||||
return ( |
||||
"<PaginationConfig from_tok=%s, to_tok=%s, " |
||||
"direction=%s, limit=%s>" |
||||
) % (self.from_tok, self.to_tok, self.direction, self.limit) |
@ -0,0 +1,180 @@ |
||||
# -*- coding: utf-8 -*- |
||||
# Copyright 2014 matrix.org |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
from twisted.internet import defer |
||||
|
||||
from synapse.api.constants import Membership |
||||
from synapse.types import StreamToken |
||||
|
||||
|
||||
class RoomEventSource(object): |
||||
def __init__(self, hs): |
||||
self.store = hs.get_datastore() |
||||
|
||||
@defer.inlineCallbacks |
||||
def get_new_events_for_user(self, user, from_token, limit): |
||||
# We just ignore the key for now. |
||||
|
||||
to_key = yield self.get_current_token_part() |
||||
|
||||
events, end_key = yield self.store.get_room_events_stream( |
||||
user_id=user.to_string(), |
||||
from_key=from_token.events_key, |
||||
to_key=to_key, |
||||
room_id=None, |
||||
limit=limit, |
||||
) |
||||
|
||||
end_token = from_token.copy_and_replace("events_key", end_key) |
||||
|
||||
defer.returnValue((events, end_token)) |
||||
|
||||
def get_current_token_part(self): |
||||
return self.store.get_room_events_max_id() |
||||
|
||||
@defer.inlineCallbacks |
||||
def get_pagination_rows(self, user, pagination_config, key): |
||||
from_token = pagination_config.from_token |
||||
to_token = pagination_config.to_token |
||||
limit = pagination_config.limit |
||||
direction = pagination_config.direction |
||||
|
||||
to_key = to_token.events_key if to_token else None |
||||
|
||||
events, next_key = yield self.store.paginate_room_events( |
||||
room_id=key, |
||||
from_key=from_token.events_key, |
||||
to_key=to_key, |
||||
direction=direction, |
||||
limit=limit, |
||||
with_feedback=True |
||||
) |
||||
|
||||
next_token = from_token.copy_and_replace("events_key", next_key) |
||||
|
||||
defer.returnValue((events, next_token)) |
||||
|
||||
|
||||
class PresenceSource(object): |
||||
def __init__(self, hs): |
||||
self.hs = hs |
||||
self.clock = hs.get_clock() |
||||
|
||||
def get_new_events_for_user(self, user, from_token, limit): |
||||
from_key = int(from_token.presence_key) |
||||
|
||||
presence = self.hs.get_handlers().presence_handler |
||||
cachemap = presence._user_cachemap |
||||
|
||||
# TODO(paul): limit, and filter by visibility |
||||
updates = [(k, cachemap[k]) for k in cachemap |
||||
if from_key < cachemap[k].serial] |
||||
|
||||
if updates: |
||||
clock = self.clock |
||||
|
||||
latest_serial = max([x[1].serial for x in updates]) |
||||
data = [x[1].make_event(user=x[0], clock=clock) for x in updates] |
||||
|
||||
end_token = from_token.copy_and_replace( |
||||
"presence_key", latest_serial |
||||
) |
||||
return ((data, end_token)) |
||||
else: |
||||
end_token = from_token.copy_and_replace( |
||||
"presence_key", presence._user_cachemap_latest_serial |
||||
) |
||||
return (([], end_token)) |
||||
|
||||
def get_current_token_part(self): |
||||
presence = self.hs.get_handlers().presence_handler |
||||
return presence._user_cachemap_latest_serial |
||||
|
||||
def get_pagination_rows(self, user, pagination_config, key): |
||||
# TODO (erikj): Does this make sense? Ordering? |
||||
|
||||
from_token = pagination_config.from_token |
||||
to_token = pagination_config.to_token |
||||
|
||||
from_key = int(from_token.presence_key) |
||||
|
||||
if to_token: |
||||
to_key = int(to_token.presence_key) |
||||
else: |
||||
to_key = -1 |
||||
|
||||
presence = self.hs.get_handlers().presence_handler |
||||
cachemap = presence._user_cachemap |
||||
|
||||
# TODO(paul): limit, and filter by visibility |
||||
updates = [(k, cachemap[k]) for k in cachemap |
||||
if to_key < cachemap[k].serial < from_key] |
||||
|
||||
if updates: |
||||
clock = self.clock |
||||
|
||||
earliest_serial = max([x[1].serial for x in updates]) |
||||
data = [x[1].make_event(user=x[0], clock=clock) for x in updates] |
||||
|
||||
if to_token: |
||||
next_token = to_token |
||||
else: |
||||
next_token = from_token |
||||
|
||||
next_token = next_token.copy_and_replace( |
||||
"presence_key", earliest_serial |
||||
) |
||||
return ((data, next_token)) |
||||
else: |
||||
if not to_token: |
||||
to_token = from_token.copy_and_replace( |
||||
"presence_key", 0 |
||||
) |
||||
return (([], to_token)) |
||||
|
||||
|
||||
class EventSources(object): |
||||
SOURCE_TYPES = { |
||||
"room": RoomEventSource, |
||||
"presence": PresenceSource, |
||||
} |
||||
|
||||
def __init__(self, hs): |
||||
self.sources = { |
||||
name: cls(hs) |
||||
for name, cls in EventSources.SOURCE_TYPES.items() |
||||
} |
||||
|
||||
@staticmethod |
||||
def create_token(events_key, presence_key): |
||||
return StreamToken(events_key=events_key, presence_key=presence_key) |
||||
|
||||
@defer.inlineCallbacks |
||||
def get_current_token(self): |
||||
events_key = yield self.sources["room"].get_current_token_part() |
||||
presence_key = yield self.sources["presence"].get_current_token_part() |
||||
token = EventSources.create_token(events_key, presence_key) |
||||
defer.returnValue(token) |
||||
|
||||
|
||||
class StreamSource(object): |
||||
def get_new_events_for_user(self, user, from_token, limit): |
||||
raise NotImplementedError("get_new_events_for_user") |
||||
|
||||
def get_current_token_part(self): |
||||
raise NotImplementedError("get_current_token_part") |
||||
|
||||
def get_pagination_rows(self, user, pagination_config, key): |
||||
raise NotImplementedError("get_rows") |
Loading…
Reference in new issue