|
|
|
@ -37,11 +37,9 @@ from twisted.internet import defer |
|
|
|
|
|
|
|
|
|
from ._base import SQLBaseStore |
|
|
|
|
from synapse.api.constants import EventTypes |
|
|
|
|
from synapse.api.errors import SynapseError |
|
|
|
|
from synapse.types import RoomStreamToken |
|
|
|
|
from synapse.util.logutils import log_function |
|
|
|
|
|
|
|
|
|
from collections import namedtuple |
|
|
|
|
|
|
|
|
|
import logging |
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -55,76 +53,26 @@ _STREAM_TOKEN = "stream" |
|
|
|
|
_TOPOLOGICAL_TOKEN = "topological" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _StreamToken(namedtuple("_StreamToken", "topological stream")): |
|
|
|
|
"""Tokens are positions between events. The token "s1" comes after event 1. |
|
|
|
|
|
|
|
|
|
s0 s1 |
|
|
|
|
| | |
|
|
|
|
[0] V [1] V [2] |
|
|
|
|
|
|
|
|
|
Tokens can either be a point in the live event stream or a cursor going |
|
|
|
|
through historic events. |
|
|
|
|
|
|
|
|
|
When traversing the live event stream events are ordered by when they |
|
|
|
|
arrived at the homeserver. |
|
|
|
|
|
|
|
|
|
When traversing historic events the events are ordered by their depth in |
|
|
|
|
the event graph "topological_ordering" and then by when they arrived at the |
|
|
|
|
homeserver "stream_ordering". |
|
|
|
|
|
|
|
|
|
Live tokens start with an "s" followed by the "stream_ordering" id of the |
|
|
|
|
event it comes after. Historic tokens start with a "t" followed by the |
|
|
|
|
"topological_ordering" id of the event it comes after, follewed by "-", |
|
|
|
|
followed by the "stream_ordering" id of the event it comes after. |
|
|
|
|
""" |
|
|
|
|
__slots__ = [] |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def parse(cls, string): |
|
|
|
|
try: |
|
|
|
|
if string[0] == 's': |
|
|
|
|
return cls(topological=None, stream=int(string[1:])) |
|
|
|
|
if string[0] == 't': |
|
|
|
|
parts = string[1:].split('-', 1) |
|
|
|
|
return cls(topological=int(parts[0]), stream=int(parts[1])) |
|
|
|
|
except: |
|
|
|
|
pass |
|
|
|
|
raise SynapseError(400, "Invalid token %r" % (string,)) |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def parse_stream_token(cls, string): |
|
|
|
|
try: |
|
|
|
|
if string[0] == 's': |
|
|
|
|
return cls(topological=None, stream=int(string[1:])) |
|
|
|
|
except: |
|
|
|
|
pass |
|
|
|
|
raise SynapseError(400, "Invalid token %r" % (string,)) |
|
|
|
|
|
|
|
|
|
def __str__(self): |
|
|
|
|
if self.topological is not None: |
|
|
|
|
return "t%d-%d" % (self.topological, self.stream) |
|
|
|
|
else: |
|
|
|
|
return "s%d" % (self.stream,) |
|
|
|
|
def lower_bound(token): |
|
|
|
|
if token.topological is None: |
|
|
|
|
return "(%d < %s)" % (token.stream, "stream_ordering") |
|
|
|
|
else: |
|
|
|
|
return "(%d < %s OR (%d = %s AND %d < %s))" % ( |
|
|
|
|
token.topological, "topological_ordering", |
|
|
|
|
token.topological, "topological_ordering", |
|
|
|
|
token.stream, "stream_ordering", |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def lower_bound(self): |
|
|
|
|
if self.topological is None: |
|
|
|
|
return "(%d < %s)" % (self.stream, "stream_ordering") |
|
|
|
|
else: |
|
|
|
|
return "(%d < %s OR (%d = %s AND %d < %s))" % ( |
|
|
|
|
self.topological, "topological_ordering", |
|
|
|
|
self.topological, "topological_ordering", |
|
|
|
|
self.stream, "stream_ordering", |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def upper_bound(self): |
|
|
|
|
if self.topological is None: |
|
|
|
|
return "(%d >= %s)" % (self.stream, "stream_ordering") |
|
|
|
|
else: |
|
|
|
|
return "(%d > %s OR (%d = %s AND %d >= %s))" % ( |
|
|
|
|
self.topological, "topological_ordering", |
|
|
|
|
self.topological, "topological_ordering", |
|
|
|
|
self.stream, "stream_ordering", |
|
|
|
|
) |
|
|
|
|
def upper_bound(token): |
|
|
|
|
if token.topological is None: |
|
|
|
|
return "(%d >= %s)" % (token.stream, "stream_ordering") |
|
|
|
|
else: |
|
|
|
|
return "(%d > %s OR (%d = %s AND %d >= %s))" % ( |
|
|
|
|
token.topological, "topological_ordering", |
|
|
|
|
token.topological, "topological_ordering", |
|
|
|
|
token.stream, "stream_ordering", |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class StreamStore(SQLBaseStore): |
|
|
|
@ -139,8 +87,8 @@ class StreamStore(SQLBaseStore): |
|
|
|
|
limit = MAX_STREAM_SIZE |
|
|
|
|
|
|
|
|
|
# From and to keys should be integers from ordering. |
|
|
|
|
from_id = _StreamToken.parse_stream_token(from_key) |
|
|
|
|
to_id = _StreamToken.parse_stream_token(to_key) |
|
|
|
|
from_id = RoomStreamToken.parse_stream_token(from_key) |
|
|
|
|
to_id = RoomStreamToken.parse_stream_token(to_key) |
|
|
|
|
|
|
|
|
|
if from_key == to_key: |
|
|
|
|
defer.returnValue(([], to_key)) |
|
|
|
@ -234,8 +182,8 @@ class StreamStore(SQLBaseStore): |
|
|
|
|
limit = MAX_STREAM_SIZE |
|
|
|
|
|
|
|
|
|
# From and to keys should be integers from ordering. |
|
|
|
|
from_id = _StreamToken.parse_stream_token(from_key) |
|
|
|
|
to_id = _StreamToken.parse_stream_token(to_key) |
|
|
|
|
from_id = RoomStreamToken.parse_stream_token(from_key) |
|
|
|
|
to_id = RoomStreamToken.parse_stream_token(to_key) |
|
|
|
|
|
|
|
|
|
if from_key == to_key: |
|
|
|
|
return defer.succeed(([], to_key)) |
|
|
|
@ -288,17 +236,17 @@ class StreamStore(SQLBaseStore): |
|
|
|
|
args = [False, room_id] |
|
|
|
|
if direction == 'b': |
|
|
|
|
order = "DESC" |
|
|
|
|
bounds = _StreamToken.parse(from_key).upper_bound() |
|
|
|
|
bounds = upper_bound(RoomStreamToken.parse(from_key)) |
|
|
|
|
if to_key: |
|
|
|
|
bounds = "%s AND %s" % ( |
|
|
|
|
bounds, _StreamToken.parse(to_key).lower_bound() |
|
|
|
|
bounds, lower_bound(RoomStreamToken.parse(to_key)) |
|
|
|
|
) |
|
|
|
|
else: |
|
|
|
|
order = "ASC" |
|
|
|
|
bounds = _StreamToken.parse(from_key).lower_bound() |
|
|
|
|
bounds = lower_bound(RoomStreamToken.parse(from_key)) |
|
|
|
|
if to_key: |
|
|
|
|
bounds = "%s AND %s" % ( |
|
|
|
|
bounds, _StreamToken.parse(to_key).upper_bound() |
|
|
|
|
bounds, upper_bound(RoomStreamToken.parse(to_key)) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if int(limit) > 0: |
|
|
|
@ -333,7 +281,7 @@ class StreamStore(SQLBaseStore): |
|
|
|
|
# when we are going backwards so we subtract one from the |
|
|
|
|
# stream part. |
|
|
|
|
toke -= 1 |
|
|
|
|
next_token = str(_StreamToken(topo, toke)) |
|
|
|
|
next_token = str(RoomStreamToken(topo, toke)) |
|
|
|
|
else: |
|
|
|
|
# TODO (erikj): We should work out what to do here instead. |
|
|
|
|
next_token = to_key if to_key else from_key |
|
|
|
@ -354,7 +302,7 @@ class StreamStore(SQLBaseStore): |
|
|
|
|
with_feedback=False, from_token=None): |
|
|
|
|
# TODO (erikj): Handle compressed feedback |
|
|
|
|
|
|
|
|
|
end_token = _StreamToken.parse_stream_token(end_token) |
|
|
|
|
end_token = RoomStreamToken.parse_stream_token(end_token) |
|
|
|
|
|
|
|
|
|
if from_token is None: |
|
|
|
|
sql = ( |
|
|
|
@ -365,7 +313,7 @@ class StreamStore(SQLBaseStore): |
|
|
|
|
" LIMIT ?" |
|
|
|
|
) |
|
|
|
|
else: |
|
|
|
|
from_token = _StreamToken.parse_stream_token(from_token) |
|
|
|
|
from_token = RoomStreamToken.parse_stream_token(from_token) |
|
|
|
|
sql = ( |
|
|
|
|
"SELECT stream_ordering, topological_ordering, event_id" |
|
|
|
|
" FROM events" |
|
|
|
@ -395,7 +343,7 @@ class StreamStore(SQLBaseStore): |
|
|
|
|
# stream part. |
|
|
|
|
topo = rows[0]["topological_ordering"] |
|
|
|
|
toke = rows[0]["stream_ordering"] - 1 |
|
|
|
|
start_token = str(_StreamToken(topo, toke)) |
|
|
|
|
start_token = str(RoomStreamToken(topo, toke)) |
|
|
|
|
|
|
|
|
|
token = (start_token, str(end_token)) |
|
|
|
|
else: |
|
|
|
@ -439,5 +387,5 @@ class StreamStore(SQLBaseStore): |
|
|
|
|
stream = row["stream_ordering"] |
|
|
|
|
topo = event.depth |
|
|
|
|
internal = event.internal_metadata |
|
|
|
|
internal.before = str(_StreamToken(topo, stream - 1)) |
|
|
|
|
internal.after = str(_StreamToken(topo, stream)) |
|
|
|
|
internal.before = str(RoomStreamToken(topo, stream - 1)) |
|
|
|
|
internal.after = str(RoomStreamToken(topo, stream)) |
|
|
|
|