|
|
|
@ -128,25 +128,73 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): |
|
|
|
|
|
|
|
|
|
class StreamStore(SQLBaseStore): |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def get_appservice_room_stream(self, service, from_key, to_key, limit=0): |
|
|
|
|
# NB this lives here instead of appservice.py so we can reuse the |
|
|
|
|
# 'private' StreamToken class in this file. |
|
|
|
|
logger.info("get_appservice_room_stream -> %s", service) |
|
|
|
|
|
|
|
|
|
if limit: |
|
|
|
|
limit = max(limit, MAX_STREAM_SIZE) |
|
|
|
|
else: |
|
|
|
|
limit = MAX_STREAM_SIZE |
|
|
|
|
|
|
|
|
|
# From and to keys should be integers from ordering. |
|
|
|
|
# from_id = _StreamToken.parse_stream_token(from_key) |
|
|
|
|
# to_id = _StreamToken.parse_stream_token(to_key) |
|
|
|
|
from_id = _StreamToken.parse_stream_token(from_key) |
|
|
|
|
to_id = _StreamToken.parse_stream_token(to_key) |
|
|
|
|
|
|
|
|
|
if from_key == to_key: |
|
|
|
|
return defer.succeed(([], to_key)) |
|
|
|
|
defer.returnValue(([], to_key)) |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
# Logic: |
|
|
|
|
# - We want ALL events which match the AS room_id regex |
|
|
|
|
# - We want ALL events which match the rooms represented by the AS |
|
|
|
|
# room_alias regex |
|
|
|
|
# - We want ALL events for rooms that AS users have joined. |
|
|
|
|
# This is currently supported via get_app_service_rooms (which is used |
|
|
|
|
# for the Notifier listener rooms). We can't reasonably make a SQL |
|
|
|
|
# query for these room IDs, so we'll pull all the events between from/to |
|
|
|
|
# and filter in python. |
|
|
|
|
rooms_for_as = yield self.get_app_service_rooms(service) |
|
|
|
|
room_ids_for_as = [r.room_id for r in rooms_for_as] |
|
|
|
|
|
|
|
|
|
# select all the events between from/to with a sensible limit |
|
|
|
|
sql = ( |
|
|
|
|
"SELECT e.event_id, e.room_id, e.stream_ordering FROM events AS e " |
|
|
|
|
"WHERE e.stream_ordering > ? AND e.stream_ordering <= ? " |
|
|
|
|
"ORDER BY stream_ordering ASC LIMIT %(limit)d " |
|
|
|
|
) % { |
|
|
|
|
"limit": limit |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
def f(txn): |
|
|
|
|
txn.execute(sql, (from_id.stream, to_id.stream,)) |
|
|
|
|
|
|
|
|
|
rows = self.cursor_to_dict(txn) |
|
|
|
|
|
|
|
|
|
ret = self._get_events_txn( |
|
|
|
|
txn, |
|
|
|
|
# apply the filter on the room id list |
|
|
|
|
[ |
|
|
|
|
r["event_id"] for r in rows |
|
|
|
|
if r["room_id"] in room_ids_for_as |
|
|
|
|
], |
|
|
|
|
get_prev_content=True |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
self._set_before_and_after(ret, rows) |
|
|
|
|
|
|
|
|
|
if rows: |
|
|
|
|
key = "s%d" % max([r["stream_ordering"] for r in rows]) |
|
|
|
|
|
|
|
|
|
else: |
|
|
|
|
# Assume we didn't get anything because there was nothing to |
|
|
|
|
# get. |
|
|
|
|
key = to_key |
|
|
|
|
|
|
|
|
|
return ret, key |
|
|
|
|
|
|
|
|
|
# TODO stub |
|
|
|
|
return defer.succeed(([], to_key)) |
|
|
|
|
results = yield self.runInteraction("get_appservice_room_stream", f) |
|
|
|
|
defer.returnValue(results) |
|
|
|
|
|
|
|
|
|
@log_function |
|
|
|
|
def get_room_events_stream(self, user_id, from_key, to_key, room_id, |
|
|
|
|