Remove redundant `get_current_events_token` (#11643)

* Push `get_room_{min,max_stream_ordering}` into StreamStore

Both implementations of this are identical, so we may as well push it down and
get rid of the abstract base class nonsense.

* Remove redundant `StreamStore` class

This is empty now

* Remove redundant `get_current_events_token`

This was an exact duplicate of `get_room_max_stream_ordering`, so let's get rid
of it.

* newsfile
code_spécifique_watcha
Richard van der Hoff 3 years ago committed by GitHub
parent bd9821f7f1
commit 2359ee3864
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      changelog.d/11643.misc
  2. 2
      synapse/handlers/federation_event.py
  3. 2
      synapse/handlers/presence.py
  4. 9
      synapse/replication/slave/storage/events.py
  5. 4
      synapse/storage/databases/main/__init__.py
  6. 4
      synapse/storage/databases/main/events_worker.py
  7. 34
      synapse/storage/databases/main/stream.py

@ -0,0 +1 @@
Remove redundant `get_current_events_token` method.

@ -1838,7 +1838,7 @@ class FederationEventHandler:
The stream ID after which all events have been persisted.
"""
if not event_and_contexts:
return self._store.get_current_events_token()
return self._store.get_room_max_stream_ordering()
instance = self._config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name:

@ -729,7 +729,7 @@ class PresenceHandler(BasePresenceHandler):
# Presence is best effort and quickly heals itself, so lets just always
# stream from the current state when we restart.
self._event_pos = self.store.get_current_events_token()
self._event_pos = self.store.get_room_max_stream_ordering()
self._event_processing = False
async def _on_shutdown(self) -> None:

@ -80,12 +80,3 @@ class SlavedEventStore(
min_curr_state_delta_id,
prefilled_cache=curr_state_delta_prefill,
)
# Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them.
def get_room_max_stream_ordering(self):
return self._stream_id_gen.get_current_token()
def get_room_min_stream_ordering(self):
return self._backfill_id_gen.get_current_token()

@ -68,7 +68,7 @@ from .session import SessionStore
from .signatures import SignatureStore
from .state import StateStore
from .stats import StatsStore
from .stream import StreamStore
from .stream import StreamWorkerStore
from .tags import TagsStore
from .transactions import TransactionWorkerStore
from .ui_auth import UIAuthStore
@ -87,7 +87,7 @@ class DataStore(
RoomStore,
RoomBatchStore,
RegistrationStore,
StreamStore,
StreamWorkerStore,
ProfileStore,
PresenceStore,
TransactionWorkerStore,

@ -1383,10 +1383,6 @@ class EventsWorkerStore(SQLBaseStore):
return {"v1": complexity_v1}
def get_current_events_token(self) -> int:
"""The current maximum token that events have reached"""
return self._stream_id_gen.get_current_token()
async def get_all_new_forward_event_rows(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:

@ -34,7 +34,7 @@ what sort order was used:
- topological tokems: "t%d-%d", where the integers map to the topological
and stream ordering columns respectively.
"""
import abc
import logging
from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple
@ -336,12 +336,7 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
return " AND ".join(clauses), args
class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
"""This is an abstract base class where subclasses must implement
`get_room_max_stream_ordering` and `get_room_min_stream_ordering`
which can be called in the initializer.
"""
class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
def __init__(
self,
database: DatabasePool,
@ -379,13 +374,22 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
self._stream_order_on_start = self.get_room_max_stream_ordering()
@abc.abstractmethod
def get_room_max_stream_ordering(self) -> int:
raise NotImplementedError()
"""Get the stream_ordering of regular events that we have committed up to
Returns the maximum stream id such that all stream ids less than or
equal to it have been successfully persisted.
"""
return self._stream_id_gen.get_current_token()
@abc.abstractmethod
def get_room_min_stream_ordering(self) -> int:
raise NotImplementedError()
"""Get the stream_ordering of backfilled events that we have committed up to
Backfilled events use *negative* stream orderings, so this returns the
minimum negative stream id such that all stream ids greater than or
equal to it have been successfully persisted.
"""
return self._backfill_id_gen.get_current_token()
def get_room_max_token(self) -> RoomStreamToken:
"""Get a `RoomStreamToken` that marks the current maximum persisted
@ -1351,11 +1355,3 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
retcol="instance_name",
desc="get_name_from_instance_id",
)
class StreamStore(StreamWorkerStore):
def get_room_max_stream_ordering(self) -> int:
return self._stream_id_gen.get_current_token()
def get_room_min_stream_ordering(self) -> int:
return self._backfill_id_gen.get_current_token()

Loading…
Cancel
Save