|
|
|
@ -78,21 +78,20 @@ class MessageHandler(BaseHandler): |
|
|
|
|
defer.returnValue(None) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def get_messages(self, user_id=None, room_id=None, pagin_config=None, |
|
|
|
|
as_client_event=True, is_guest=False): |
|
|
|
|
def get_messages(self, requester, room_id=None, pagin_config=None, |
|
|
|
|
as_client_event=True): |
|
|
|
|
"""Get messages in a room. |
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
|
user_id (str): The user requesting messages. |
|
|
|
|
requester (Requester): The user requesting messages. |
|
|
|
|
room_id (str): The room they want messages from. |
|
|
|
|
pagin_config (synapse.api.streams.PaginationConfig): The pagination |
|
|
|
|
config rules to apply, if any. |
|
|
|
|
as_client_event (bool): True to get events in client-server format. |
|
|
|
|
is_guest (bool): Whether the requesting user is a guest (as opposed |
|
|
|
|
to a fully registered user). |
|
|
|
|
Returns: |
|
|
|
|
dict: Pagination API results |
|
|
|
|
""" |
|
|
|
|
user_id = requester.user.to_string() |
|
|
|
|
data_source = self.hs.get_event_sources().sources["room"] |
|
|
|
|
|
|
|
|
|
if pagin_config.from_token: |
|
|
|
@ -115,36 +114,33 @@ class MessageHandler(BaseHandler): |
|
|
|
|
|
|
|
|
|
source_config = pagin_config.get_source_config("room") |
|
|
|
|
|
|
|
|
|
if not is_guest: |
|
|
|
|
member_event = yield self.auth.check_user_was_in_room(room_id, user_id) |
|
|
|
|
if member_event.membership == Membership.LEAVE: |
|
|
|
|
# If they have left the room then clamp the token to be before |
|
|
|
|
# they left the room. |
|
|
|
|
# If they're a guest, we'll just 403 them if they're asking for |
|
|
|
|
# events they can't see. |
|
|
|
|
leave_token = yield self.store.get_topological_token_for_event( |
|
|
|
|
member_event.event_id |
|
|
|
|
) |
|
|
|
|
leave_token = RoomStreamToken.parse(leave_token) |
|
|
|
|
if leave_token.topological < room_token.topological: |
|
|
|
|
source_config.from_key = str(leave_token) |
|
|
|
|
|
|
|
|
|
if source_config.direction == "f": |
|
|
|
|
if source_config.to_key is None: |
|
|
|
|
membership, member_event_id = yield self._check_in_room_or_world_readable( |
|
|
|
|
room_id, user_id |
|
|
|
|
) |
|
|
|
|
if membership == Membership.LEAVE: |
|
|
|
|
# If they have left the room then clamp the token to be before |
|
|
|
|
# they left the room. |
|
|
|
|
leave_token = yield self.store.get_topological_token_for_event( |
|
|
|
|
member_event_id |
|
|
|
|
) |
|
|
|
|
leave_token = RoomStreamToken.parse(leave_token) |
|
|
|
|
if leave_token.topological < room_token.topological: |
|
|
|
|
source_config.from_key = str(leave_token) |
|
|
|
|
|
|
|
|
|
if source_config.direction == "f": |
|
|
|
|
if source_config.to_key is None: |
|
|
|
|
source_config.to_key = str(leave_token) |
|
|
|
|
else: |
|
|
|
|
to_token = RoomStreamToken.parse(source_config.to_key) |
|
|
|
|
if leave_token.topological < to_token.topological: |
|
|
|
|
source_config.to_key = str(leave_token) |
|
|
|
|
else: |
|
|
|
|
to_token = RoomStreamToken.parse(source_config.to_key) |
|
|
|
|
if leave_token.topological < to_token.topological: |
|
|
|
|
source_config.to_key = str(leave_token) |
|
|
|
|
|
|
|
|
|
yield self.hs.get_handlers().federation_handler.maybe_backfill( |
|
|
|
|
room_id, room_token.topological |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
user = UserID.from_string(user_id) |
|
|
|
|
|
|
|
|
|
events, next_key = yield data_source.get_pagination_rows( |
|
|
|
|
user, source_config, room_id |
|
|
|
|
requester.user, source_config, room_id |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
next_token = pagin_config.from_token.copy_and_replace( |
|
|
|
@ -158,7 +154,11 @@ class MessageHandler(BaseHandler): |
|
|
|
|
"end": next_token.to_string(), |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
events = yield self._filter_events_for_client(user_id, events, is_guest=is_guest) |
|
|
|
|
events = yield self._filter_events_for_client( |
|
|
|
|
user_id, |
|
|
|
|
events, |
|
|
|
|
is_peeking=(member_event_id is None), |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
time_now = self.clock.time_msec() |
|
|
|
|
|
|
|
|
@ -289,7 +289,7 @@ class MessageHandler(BaseHandler): |
|
|
|
|
SynapseError if something went wrong. |
|
|
|
|
""" |
|
|
|
|
membership, membership_event_id = yield self._check_in_room_or_world_readable( |
|
|
|
|
room_id, user_id, is_guest |
|
|
|
|
room_id, user_id |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if membership == Membership.JOIN: |
|
|
|
@ -306,7 +306,7 @@ class MessageHandler(BaseHandler): |
|
|
|
|
defer.returnValue(data) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def _check_in_room_or_world_readable(self, room_id, user_id, is_guest): |
|
|
|
|
def _check_in_room_or_world_readable(self, room_id, user_id): |
|
|
|
|
try: |
|
|
|
|
# check_user_was_in_room will return the most recent membership |
|
|
|
|
# event for the user if: |
|
|
|
@ -316,7 +316,7 @@ class MessageHandler(BaseHandler): |
|
|
|
|
member_event = yield self.auth.check_user_was_in_room(room_id, user_id) |
|
|
|
|
defer.returnValue((member_event.membership, member_event.event_id)) |
|
|
|
|
return |
|
|
|
|
except AuthError, auth_error: |
|
|
|
|
except AuthError: |
|
|
|
|
visibility = yield self.state_handler.get_current_state( |
|
|
|
|
room_id, EventTypes.RoomHistoryVisibility, "" |
|
|
|
|
) |
|
|
|
@ -326,8 +326,6 @@ class MessageHandler(BaseHandler): |
|
|
|
|
): |
|
|
|
|
defer.returnValue((Membership.JOIN, None)) |
|
|
|
|
return |
|
|
|
|
if not is_guest: |
|
|
|
|
raise auth_error |
|
|
|
|
raise AuthError( |
|
|
|
|
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN |
|
|
|
|
) |
|
|
|
@ -345,7 +343,7 @@ class MessageHandler(BaseHandler): |
|
|
|
|
A list of dicts representing state events. [{}, {}, {}] |
|
|
|
|
""" |
|
|
|
|
membership, membership_event_id = yield self._check_in_room_or_world_readable( |
|
|
|
|
room_id, user_id, is_guest |
|
|
|
|
room_id, user_id |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if membership == Membership.JOIN: |
|
|
|
@ -556,13 +554,13 @@ class MessageHandler(BaseHandler): |
|
|
|
|
defer.returnValue(ret) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def room_initial_sync(self, user_id, room_id, pagin_config=None, is_guest=False): |
|
|
|
|
def room_initial_sync(self, requester, room_id, pagin_config=None): |
|
|
|
|
"""Capture the a snapshot of a room. If user is currently a member of |
|
|
|
|
the room this will be what is currently in the room. If the user left |
|
|
|
|
the room this will be what was in the room when they left. |
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
|
user_id(str): The user to get a snapshot for. |
|
|
|
|
requester(Requester): The user to get a snapshot for. |
|
|
|
|
room_id(str): The room to get a snapshot of. |
|
|
|
|
pagin_config(synapse.streams.config.PaginationConfig): |
|
|
|
|
The pagination config used to determine how many messages to |
|
|
|
@ -573,19 +571,20 @@ class MessageHandler(BaseHandler): |
|
|
|
|
A JSON serialisable dict with the snapshot of the room. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
user_id = requester.user.to_string() |
|
|
|
|
|
|
|
|
|
membership, member_event_id = yield self._check_in_room_or_world_readable( |
|
|
|
|
room_id, |
|
|
|
|
user_id, |
|
|
|
|
is_guest |
|
|
|
|
room_id, user_id, |
|
|
|
|
) |
|
|
|
|
is_peeking = member_event_id is None |
|
|
|
|
|
|
|
|
|
if membership == Membership.JOIN: |
|
|
|
|
result = yield self._room_initial_sync_joined( |
|
|
|
|
user_id, room_id, pagin_config, membership, is_guest |
|
|
|
|
user_id, room_id, pagin_config, membership, is_peeking |
|
|
|
|
) |
|
|
|
|
elif membership == Membership.LEAVE: |
|
|
|
|
result = yield self._room_initial_sync_parted( |
|
|
|
|
user_id, room_id, pagin_config, membership, member_event_id, is_guest |
|
|
|
|
user_id, room_id, pagin_config, membership, member_event_id, is_peeking |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
account_data_events = [] |
|
|
|
@ -609,7 +608,7 @@ class MessageHandler(BaseHandler): |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def _room_initial_sync_parted(self, user_id, room_id, pagin_config, |
|
|
|
|
membership, member_event_id, is_guest): |
|
|
|
|
membership, member_event_id, is_peeking): |
|
|
|
|
room_state = yield self.store.get_state_for_events( |
|
|
|
|
[member_event_id], None |
|
|
|
|
) |
|
|
|
@ -631,7 +630,7 @@ class MessageHandler(BaseHandler): |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
messages = yield self._filter_events_for_client( |
|
|
|
|
user_id, messages, is_guest=is_guest |
|
|
|
|
user_id, messages, is_peeking=is_peeking |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
start_token = StreamToken(token[0], 0, 0, 0, 0) |
|
|
|
@ -654,7 +653,7 @@ class MessageHandler(BaseHandler): |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def _room_initial_sync_joined(self, user_id, room_id, pagin_config, |
|
|
|
|
membership, is_guest): |
|
|
|
|
membership, is_peeking): |
|
|
|
|
current_state = yield self.state.get_current_state( |
|
|
|
|
room_id=room_id, |
|
|
|
|
) |
|
|
|
@ -718,7 +717,7 @@ class MessageHandler(BaseHandler): |
|
|
|
|
).addErrback(unwrapFirstError) |
|
|
|
|
|
|
|
|
|
messages = yield self._filter_events_for_client( |
|
|
|
|
user_id, messages, is_guest=is_guest, |
|
|
|
|
user_id, messages, is_peeking=is_peeking, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
start_token = now_token.copy_and_replace("room_key", token[0]) |
|
|
|
@ -737,7 +736,7 @@ class MessageHandler(BaseHandler): |
|
|
|
|
"presence": presence, |
|
|
|
|
"receipts": receipts, |
|
|
|
|
} |
|
|
|
|
if not is_guest: |
|
|
|
|
if not is_peeking: |
|
|
|
|
ret["membership"] = membership |
|
|
|
|
|
|
|
|
|
defer.returnValue(ret) |
|
|
|
|