|
|
|
@ -137,7 +137,6 @@ class StreamStore(SQLBaseStore): |
|
|
|
|
with_feedback=with_feedback, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
@log_function |
|
|
|
|
def get_room_events_stream(self, user_id, from_key, to_key, room_id, |
|
|
|
|
limit=0, with_feedback=False): |
|
|
|
@ -157,11 +156,6 @@ class StreamStore(SQLBaseStore): |
|
|
|
|
"WHERE m.user_id = ? " |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
del_sql = ( |
|
|
|
|
"SELECT event_id FROM redactions WHERE redacts = e.event_id " |
|
|
|
|
"LIMIT 1" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if limit: |
|
|
|
|
limit = max(limit, MAX_STREAM_SIZE) |
|
|
|
|
else: |
|
|
|
@ -172,38 +166,42 @@ class StreamStore(SQLBaseStore): |
|
|
|
|
to_id = _parse_stream_token(to_key) |
|
|
|
|
|
|
|
|
|
if from_key == to_key: |
|
|
|
|
defer.returnValue(([], to_key)) |
|
|
|
|
return |
|
|
|
|
return defer.succeed(([], to_key)) |
|
|
|
|
|
|
|
|
|
sql = ( |
|
|
|
|
"SELECT *, (%(redacted)s) AS redacted FROM events AS e WHERE " |
|
|
|
|
"SELECT e.event_id, e.stream_ordering FROM events AS e WHERE " |
|
|
|
|
"(e.outlier = 0 AND (room_id IN (%(current)s)) OR " |
|
|
|
|
"(event_id IN (%(invites)s))) " |
|
|
|
|
"AND e.stream_ordering > ? AND e.stream_ordering <= ? " |
|
|
|
|
"ORDER BY stream_ordering ASC LIMIT %(limit)d " |
|
|
|
|
) % { |
|
|
|
|
"redacted": del_sql, |
|
|
|
|
"current": current_room_membership_sql, |
|
|
|
|
"invites": membership_sql, |
|
|
|
|
"limit": limit |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
rows = yield self._execute_and_decode( |
|
|
|
|
sql, |
|
|
|
|
user_id, user_id, from_id, to_id |
|
|
|
|
) |
|
|
|
|
def f(txn): |
|
|
|
|
txn.execute(sql, (user_id, user_id, from_id, to_id,)) |
|
|
|
|
|
|
|
|
|
ret = yield self._parse_events(rows) |
|
|
|
|
rows = self.cursor_to_dict(txn) |
|
|
|
|
|
|
|
|
|
if rows: |
|
|
|
|
key = "s%d" % max([r["stream_ordering"] for r in rows]) |
|
|
|
|
else: |
|
|
|
|
# Assume we didn't get anything because there was nothing to get. |
|
|
|
|
key = to_key |
|
|
|
|
ret = self._get_events_txn( |
|
|
|
|
txn, |
|
|
|
|
[r["event_id"] for r in rows], |
|
|
|
|
get_prev_content=True |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if rows: |
|
|
|
|
key = "s%d" % max([r["stream_ordering"] for r in rows]) |
|
|
|
|
else: |
|
|
|
|
# Assume we didn't get anything because there was nothing to |
|
|
|
|
# get. |
|
|
|
|
key = to_key |
|
|
|
|
|
|
|
|
|
return ret, key |
|
|
|
|
|
|
|
|
|
defer.returnValue((ret, key)) |
|
|
|
|
return self.runInteraction("get_room_events_stream", f) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
@log_function |
|
|
|
|
def paginate_room_events(self, room_id, from_key, to_key=None, |
|
|
|
|
direction='b', limit=-1, |
|
|
|
@ -221,7 +219,9 @@ class StreamStore(SQLBaseStore): |
|
|
|
|
|
|
|
|
|
bounds = _get_token_bound(from_key, from_comp) |
|
|
|
|
if to_key: |
|
|
|
|
bounds = "%s AND %s" % (bounds, _get_token_bound(to_key, to_comp)) |
|
|
|
|
bounds = "%s AND %s" % ( |
|
|
|
|
bounds, _get_token_bound(to_key, to_comp) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if int(limit) > 0: |
|
|
|
|
args.append(int(limit)) |
|
|
|
@ -229,87 +229,78 @@ class StreamStore(SQLBaseStore): |
|
|
|
|
else: |
|
|
|
|
limit_str = "" |
|
|
|
|
|
|
|
|
|
del_sql = ( |
|
|
|
|
"SELECT event_id FROM redactions WHERE redacts = events.event_id " |
|
|
|
|
"LIMIT 1" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
sql = ( |
|
|
|
|
"SELECT *, (%(redacted)s) AS redacted FROM events" |
|
|
|
|
"SELECT * FROM events" |
|
|
|
|
" WHERE outlier = 0 AND room_id = ? AND %(bounds)s" |
|
|
|
|
" ORDER BY topological_ordering %(order)s," |
|
|
|
|
" stream_ordering %(order)s %(limit)s" |
|
|
|
|
) % { |
|
|
|
|
"redacted": del_sql, |
|
|
|
|
"bounds": bounds, |
|
|
|
|
"order": order, |
|
|
|
|
"limit": limit_str |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
rows = yield self._execute_and_decode( |
|
|
|
|
sql, |
|
|
|
|
*args |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if rows: |
|
|
|
|
topo = rows[-1]["topological_ordering"] |
|
|
|
|
toke = rows[-1]["stream_ordering"] |
|
|
|
|
if direction == 'b': |
|
|
|
|
topo -= 1 |
|
|
|
|
toke -= 1 |
|
|
|
|
next_token = "t%s-%s" % (topo, toke) |
|
|
|
|
else: |
|
|
|
|
# TODO (erikj): We should work out what to do here instead. |
|
|
|
|
next_token = to_key if to_key else from_key |
|
|
|
|
def f(txn): |
|
|
|
|
txn.execute(sql, args) |
|
|
|
|
|
|
|
|
|
rows = self.cursor_to_dict(txn) |
|
|
|
|
|
|
|
|
|
if rows: |
|
|
|
|
topo = rows[-1]["topological_ordering"] |
|
|
|
|
toke = rows[-1]["stream_ordering"] |
|
|
|
|
if direction == 'b': |
|
|
|
|
topo -= 1 |
|
|
|
|
toke -= 1 |
|
|
|
|
next_token = "t%s-%s" % (topo, toke) |
|
|
|
|
else: |
|
|
|
|
# TODO (erikj): We should work out what to do here instead. |
|
|
|
|
next_token = to_key if to_key else from_key |
|
|
|
|
|
|
|
|
|
events = self._get_events_txn( |
|
|
|
|
txn, |
|
|
|
|
[r["event_id"] for r in rows], |
|
|
|
|
get_prev_content=True |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
events = yield self._parse_events(rows) |
|
|
|
|
return events, next_token, |
|
|
|
|
|
|
|
|
|
defer.returnValue( |
|
|
|
|
( |
|
|
|
|
events, |
|
|
|
|
next_token |
|
|
|
|
) |
|
|
|
|
) |
|
|
|
|
return self.runInteraction("paginate_room_events", f) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def get_recent_events_for_room(self, room_id, limit, end_token, |
|
|
|
|
with_feedback=False): |
|
|
|
|
# TODO (erikj): Handle compressed feedback |
|
|
|
|
|
|
|
|
|
del_sql = ( |
|
|
|
|
"SELECT event_id FROM redactions WHERE redacts = events.event_id " |
|
|
|
|
"LIMIT 1" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
sql = ( |
|
|
|
|
"SELECT *, (%(redacted)s) AS redacted FROM events " |
|
|
|
|
"SELECT * FROM events " |
|
|
|
|
"WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 " |
|
|
|
|
"ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? " |
|
|
|
|
) % { |
|
|
|
|
"redacted": del_sql, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
rows = yield self._execute_and_decode( |
|
|
|
|
sql, |
|
|
|
|
room_id, end_token, limit |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
rows.reverse() # As we selected with reverse ordering |
|
|
|
|
def f(txn): |
|
|
|
|
txn.execute(sql, (room_id, end_token, limit,)) |
|
|
|
|
|
|
|
|
|
if rows: |
|
|
|
|
topo = rows[0]["topological_ordering"] |
|
|
|
|
toke = rows[0]["stream_ordering"] |
|
|
|
|
start_token = "t%s-%s" % (topo, toke) |
|
|
|
|
rows = self.cursor_to_dict(txn) |
|
|
|
|
|
|
|
|
|
token = (start_token, end_token) |
|
|
|
|
else: |
|
|
|
|
token = (end_token, end_token) |
|
|
|
|
rows.reverse() # As we selected with reverse ordering |
|
|
|
|
|
|
|
|
|
events = yield self._parse_events(rows) |
|
|
|
|
if rows: |
|
|
|
|
topo = rows[0]["topological_ordering"] |
|
|
|
|
toke = rows[0]["stream_ordering"] |
|
|
|
|
start_token = "t%s-%s" % (topo, toke) |
|
|
|
|
|
|
|
|
|
token = (start_token, end_token) |
|
|
|
|
else: |
|
|
|
|
token = (end_token, end_token) |
|
|
|
|
|
|
|
|
|
events = self._get_events_txn( |
|
|
|
|
txn, |
|
|
|
|
[r["event_id"] for r in rows], |
|
|
|
|
get_prev_content=True |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
ret = (events, token) |
|
|
|
|
return events, token |
|
|
|
|
|
|
|
|
|
defer.returnValue(ret) |
|
|
|
|
return self.runInteraction("get_recent_events_for_room", f) |
|
|
|
|
|
|
|
|
|
def get_room_events_max_id(self): |
|
|
|
|
return self.runInteraction( |
|
|
|
|