Add a consistency check on events read from the database (#12620)

I've seen a few errors which can only plausibly be explained by the calculated
event id for an event being different from the ID of the event in the
database. It should be cheap to check this, so let's do so and raise an
exception.
code_spécifique_watcha
Richard van der Hoff 3 years ago committed by GitHub
parent 9ce51a47f6
commit 96e0cdbc5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      changelog.d/12620.misc
  2. 12
      synapse/storage/databases/main/events_worker.py
  3. 59
      tests/storage/databases/main/test_events_worker.py

@ -0,0 +1 @@
Add a consistency check on events which we read from the database.

@ -1094,6 +1094,18 @@ class EventsWorkerStore(SQLBaseStore):
original_ev.internal_metadata.stream_ordering = row.stream_ordering
original_ev.internal_metadata.outlier = row.outlier
# Consistency check: if the content of the event has been modified in the
# database, then the calculated event ID will not match the event id in the
# database.
if original_ev.event_id != event_id:
# it's difficult to see what to do here. Pretty much all bets are off
# if Synapse cannot rely on the consistency of its database.
raise RuntimeError(
f"Database corruption: Event {event_id} in room {d['room_id']} "
f"from the database appears to have been modified (calculated "
f"event id {original_ev.event_id})"
)
event_map[event_id] = original_ev
# finally, we can decide whether each one needs redacting, and build

@ -13,7 +13,7 @@
# limitations under the License.
import json
from contextlib import contextmanager
from typing import Generator, Tuple
from typing import Generator, List, Tuple
from unittest import mock
from twisted.enterprise.adbapi import ConnectionPool
@ -21,6 +21,7 @@ from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.room_versions import EventFormatVersions, RoomVersions
from synapse.events import make_event_from_dict
from synapse.logging.context import LoggingContext
from synapse.rest import admin
from synapse.rest.client import login, room
@ -49,23 +50,28 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
)
)
for idx, (rid, eid) in enumerate(
self.event_ids: List[str] = []
for idx, rid in enumerate(
(
("room1", "event10"),
("room1", "event11"),
("room1", "event12"),
("room2", "event20"),
"room1",
"room1",
"room1",
"room2",
)
):
event_json = {"type": f"test {idx}", "room_id": rid}
event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
event_id = event.event_id
self.get_success(
self.store.db_pool.simple_insert(
"events",
{
"event_id": eid,
"event_id": event_id,
"room_id": rid,
"topological_ordering": idx,
"stream_ordering": idx,
"type": "test",
"type": event.type,
"processed": True,
"outlier": False,
},
@ -75,21 +81,22 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
self.store.db_pool.simple_insert(
"event_json",
{
"event_id": eid,
"event_id": event_id,
"room_id": rid,
"json": json.dumps({"type": "test", "room_id": rid}),
"json": json.dumps(event_json),
"internal_metadata": "{}",
"format_version": 3,
},
)
)
self.event_ids.append(event_id)
def test_simple(self):
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", ["event10", "event19"])
self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
)
self.assertEqual(res, {"event10"})
self.assertEqual(res, {self.event_ids[0]})
# that should result in a single db query
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
@ -97,19 +104,21 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
# a second lookup of the same events should cause no queries
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", ["event10", "event19"])
self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
)
self.assertEqual(res, {"event10"})
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
def test_query_via_event_cache(self):
# fetch an event into the event cache
self.get_success(self.store.get_event("event10"))
self.get_success(self.store.get_event(self.event_ids[0]))
# looking it up should now cause no db hits
with LoggingContext(name="test") as ctx:
res = self.get_success(self.store.have_seen_events("room1", ["event10"]))
self.assertEqual(res, {"event10"})
res = self.get_success(
self.store.have_seen_events("room1", [self.event_ids[0]])
)
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
@ -167,7 +176,6 @@ class DatabaseOutageTestCase(unittest.HomeserverTestCase):
self.store: EventsWorkerStore = hs.get_datastores().main
self.room_id = f"!room:{hs.hostname}"
self.event_ids = [f"event{i}" for i in range(20)]
self._populate_events()
@ -190,8 +198,14 @@ class DatabaseOutageTestCase(unittest.HomeserverTestCase):
)
)
self.event_ids = [f"event{i}" for i in range(20)]
for idx, event_id in enumerate(self.event_ids):
self.event_ids: List[str] = []
for idx in range(20):
event_json = {
"type": f"test {idx}",
"room_id": self.room_id,
}
event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
event_id = event.event_id
self.get_success(
self.store.db_pool.simple_upsert(
"events",
@ -201,7 +215,7 @@ class DatabaseOutageTestCase(unittest.HomeserverTestCase):
"room_id": self.room_id,
"topological_ordering": idx,
"stream_ordering": idx,
"type": "test",
"type": event.type,
"processed": True,
"outlier": False,
},
@ -213,12 +227,13 @@ class DatabaseOutageTestCase(unittest.HomeserverTestCase):
{"event_id": event_id},
{
"room_id": self.room_id,
"json": json.dumps({"type": "test", "room_id": self.room_id}),
"json": json.dumps(event_json),
"internal_metadata": "{}",
"format_version": EventFormatVersions.V3,
},
)
)
self.event_ids.append(event_id)
@contextmanager
def _outage(self) -> Generator[None, None, None]:

Loading…
Cancel
Save