|
|
@ -45,6 +45,9 @@ class DataStore(RoomMemberStore, RoomStore, |
|
|
|
self.event_factory = hs.get_event_factory() |
|
|
|
self.event_factory = hs.get_event_factory() |
|
|
|
self.hs = hs |
|
|
|
self.hs = hs |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.min_token_deferred = self._get_min_token() |
|
|
|
|
|
|
|
self.min_token = None |
|
|
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
@defer.inlineCallbacks |
|
|
|
def persist_event(self, event, backfilled=False): |
|
|
|
def persist_event(self, event, backfilled=False): |
|
|
|
if event.type == RoomMemberEvent.TYPE: |
|
|
|
if event.type == RoomMemberEvent.TYPE: |
|
|
@ -82,7 +85,7 @@ class DataStore(RoomMemberStore, RoomStore, |
|
|
|
@defer.inlineCallbacks |
|
|
|
@defer.inlineCallbacks |
|
|
|
def _store_event(self, event, backfilled): |
|
|
|
def _store_event(self, event, backfilled): |
|
|
|
# FIXME (erikj): This should be removed when we start amalgamating |
|
|
|
# FIXME (erikj): This should be removed when we start amalgamating |
|
|
|
# event and pdu storage. |
|
|
|
# event and pdu storage |
|
|
|
yield self.hs.get_federation().fill_out_prev_events(event) |
|
|
|
yield self.hs.get_federation().fill_out_prev_events(event) |
|
|
|
|
|
|
|
|
|
|
|
vals = { |
|
|
|
vals = { |
|
|
@ -95,7 +98,10 @@ class DataStore(RoomMemberStore, RoomStore, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if backfilled: |
|
|
|
if backfilled: |
|
|
|
vals["token_ordering"] = "-1" |
|
|
|
if not self.min_token_deferred.called: |
|
|
|
|
|
|
|
yield self.min_token_deferred |
|
|
|
|
|
|
|
self.min_token -= 1 |
|
|
|
|
|
|
|
vals["token_ordering"] = self.min_token |
|
|
|
|
|
|
|
|
|
|
|
unrec = { |
|
|
|
unrec = { |
|
|
|
k: v |
|
|
|
k: v |
|
|
@ -151,6 +157,17 @@ class DataStore(RoomMemberStore, RoomStore, |
|
|
|
|
|
|
|
|
|
|
|
defer.returnValue([self._parse_event_from_row(r) for r in results]) |
|
|
|
defer.returnValue([self._parse_event_from_row(r) for r in results]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
|
|
|
def _get_min_token(self): |
|
|
|
|
|
|
|
row = yield self._execute( |
|
|
|
|
|
|
|
None, |
|
|
|
|
|
|
|
"SELECT MIN(token_ordering) FROM events" |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.min_token = rows[0][0] if rows and rows[0] else 0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
defer.returnValue(self.min_token) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def schema_path(schema): |
|
|
|
def schema_path(schema): |
|
|
|
""" Get a filesystem path for the named database schema |
|
|
|
""" Get a filesystem path for the named database schema |
|
|
|