Merge pull request #1836 from matrix-org/erikj/current_state_fix

Derive current_state_events from state groups
pull/4/merge
Erik Johnston 8 years ago committed by GitHub
commit 3d9d48fffb
  1. 1
      synapse/handlers/federation.py
  2. 3
      synapse/state.py
  3. 76
      synapse/storage/event_federation.py
  4. 293
      synapse/storage/events.py
  5. 45
      tests/replication/slave/storage/test_events.py

@ -1319,7 +1319,6 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event(
event, new_event_context,
current_state=state,
)
defer.returnValue((event_stream_id, max_stream_id))

@ -429,6 +429,9 @@ def resolve_events(state_sets, state_map_factory):
dict[(str, str), synapse.events.FrozenEvent] is a map from
(type, state_key) to event.
"""
if len(state_sets) == 1:
return state_sets[0]
unconflicted_state, conflicted_state = _seperate(
state_sets,
)

@ -235,80 +235,21 @@ class EventFederationStore(SQLBaseStore):
],
)
self._update_extremeties(txn, events)
self._update_backward_extremeties(txn, events)
def _update_extremeties(self, txn, events):
"""Updates the event_*_extremities tables based on the new/updated
def _update_backward_extremeties(self, txn, events):
"""Updates the event_backward_extremities tables based on the new/updated
events being persisted.
This is called for new events *and* for events that were outliers, but
are are now being persisted as non-outliers.
are now being persisted as non-outliers.
Forward extremities are handled when we first start persisting the events.
"""
events_by_room = {}
for ev in events:
events_by_room.setdefault(ev.room_id, []).append(ev)
for room_id, room_events in events_by_room.items():
prevs = [
e_id for ev in room_events for e_id, _ in ev.prev_events
if not ev.internal_metadata.is_outlier()
]
if prevs:
txn.execute(
"DELETE FROM event_forward_extremities"
" WHERE room_id = ?"
" AND event_id in (%s)" % (
",".join(["?"] * len(prevs)),
),
[room_id] + prevs,
)
query = (
"INSERT INTO event_forward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
" SELECT 1 FROM event_edges WHERE prev_event_id = ?"
" )"
)
txn.executemany(
query,
[
(ev.event_id, ev.room_id, ev.event_id) for ev in events
if not ev.internal_metadata.is_outlier()
]
)
# We now insert into stream_ordering_to_exterm a mapping from room_id,
# new stream_ordering to new forward extremeties in the room.
# This allows us to later efficiently look up the forward extremeties
# for a room before a given stream_ordering
max_stream_ord = max(
ev.internal_metadata.stream_ordering for ev in events
)
new_extrem = {}
for room_id in events_by_room:
event_ids = self._simple_select_onecol_txn(
txn,
table="event_forward_extremities",
keyvalues={"room_id": room_id},
retcol="event_id",
)
new_extrem[room_id] = event_ids
self._simple_insert_many_txn(
txn,
table="stream_ordering_to_exterm",
values=[
{
"room_id": room_id,
"event_id": event_id,
"stream_ordering": max_stream_ord,
}
for room_id, extrem_evs in new_extrem.items()
for event_id in extrem_evs
]
)
query = (
"INSERT INTO event_backward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
@ -339,11 +280,6 @@ class EventFederationStore(SQLBaseStore):
]
)
for room_id in events_by_room:
txn.call_after(
self.get_latest_event_ids_in_room.invalidate, (room_id,)
)
def get_forward_extremeties_for_room(self, room_id, stream_ordering):
# We want to make the cache more effective, so we clamp to the last
# change before the given ordering.

@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore, _RollbackButIsFineException
from ._base import SQLBaseStore
from twisted.internet import defer, reactor
@ -27,6 +27,7 @@ from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.state import resolve_events
from canonicaljson import encode_canonical_json
from collections import deque, namedtuple, OrderedDict
@ -71,22 +72,19 @@ class _EventPeristenceQueue(object):
"""
_EventPersistQueueItem = namedtuple("_EventPersistQueueItem", (
"events_and_contexts", "current_state", "backfilled", "deferred",
"events_and_contexts", "backfilled", "deferred",
))
def __init__(self):
self._event_persist_queues = {}
self._currently_persisting_rooms = set()
def add_to_queue(self, room_id, events_and_contexts, backfilled, current_state):
def add_to_queue(self, room_id, events_and_contexts, backfilled):
"""Add events to the queue, with the given persist_event options.
"""
queue = self._event_persist_queues.setdefault(room_id, deque())
if queue:
end_item = queue[-1]
if end_item.current_state or current_state:
# We perist events with current_state set to True one at a time
pass
if end_item.backfilled == backfilled:
end_item.events_and_contexts.extend(events_and_contexts)
return end_item.deferred.observe()
@ -96,7 +94,6 @@ class _EventPeristenceQueue(object):
queue.append(self._EventPersistQueueItem(
events_and_contexts=events_and_contexts,
backfilled=backfilled,
current_state=current_state,
deferred=deferred,
))
@ -216,7 +213,6 @@ class EventsStore(SQLBaseStore):
d = preserve_fn(self._event_persist_queue.add_to_queue)(
room_id, evs_ctxs,
backfilled=backfilled,
current_state=None,
)
deferreds.append(d)
@ -229,11 +225,10 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
@log_function
def persist_event(self, event, context, current_state=None, backfilled=False):
def persist_event(self, event, context, backfilled=False):
deferred = self._event_persist_queue.add_to_queue(
event.room_id, [(event, context)],
backfilled=backfilled,
current_state=current_state,
)
self._maybe_start_persisting(event.room_id)
@ -246,21 +241,10 @@ class EventsStore(SQLBaseStore):
def _maybe_start_persisting(self, room_id):
@defer.inlineCallbacks
def persisting_queue(item):
if item.current_state:
for event, context in item.events_and_contexts:
# There should only ever be one item in
# events_and_contexts when current_state is
# not None
yield self._persist_event(
event, context,
current_state=item.current_state,
backfilled=item.backfilled,
)
else:
yield self._persist_events(
item.events_and_contexts,
backfilled=item.backfilled,
)
yield self._persist_events(
item.events_and_contexts,
backfilled=item.backfilled,
)
self._event_persist_queue.handle_queue(room_id, persisting_queue)
@ -294,35 +278,132 @@ class EventsStore(SQLBaseStore):
for chunk in chunks:
# We can't easily parallelize these since different chunks
# might contain the same event. :(
# NB: Assumes that we are only persisting events for one room
# at a time.
new_forward_extremeties = {}
current_state_for_room = {}
if not backfilled:
# Work out the new "current state" for each room.
# We do this by working out what the new extremities are and then
# calculating the state from that.
events_by_room = {}
for event, context in chunk:
events_by_room.setdefault(event.room_id, []).append(
(event, context)
)
for room_id, ev_ctx_rm in events_by_room.items():
# Work out new extremities by recursively adding and removing
# the new events.
latest_event_ids = yield self.get_latest_event_ids_in_room(
room_id
)
new_latest_event_ids = yield self._calculate_new_extremeties(
room_id, [ev for ev, _ in ev_ctx_rm]
)
if new_latest_event_ids == set(latest_event_ids):
# No change in extremities, so no change in state
continue
new_forward_extremeties[room_id] = new_latest_event_ids
# Now we need to work out the different state sets for
# each state extremities
state_sets = []
missing_event_ids = []
was_updated = False
for event_id in new_latest_event_ids:
# First search in the list of new events we're adding,
# and then use the current state from that
for ev, ctx in ev_ctx_rm:
if event_id == ev.event_id:
if ctx.current_state_ids is None:
raise Exception("Unknown current state")
state_sets.append(ctx.current_state_ids)
if ctx.delta_ids or hasattr(ev, "state_key"):
was_updated = True
break
else:
# If we couldn't find it, then we'll need to pull
# the state from the database
was_updated = True
missing_event_ids.append(event_id)
if missing_event_ids:
# Now pull out the state for any missing events from DB
event_to_groups = yield self._get_state_group_for_events(
missing_event_ids,
)
groups = set(event_to_groups.values())
group_to_state = yield self._get_state_for_groups(groups)
state_sets.extend(group_to_state.values())
if not new_latest_event_ids or was_updated:
current_state_for_room[room_id] = yield resolve_events(
state_sets,
state_map_factory=lambda ev_ids: self.get_events(
ev_ids, get_prev_content=False, check_redacted=False,
),
)
yield self.runInteraction(
"persist_events",
self._persist_events_txn,
events_and_contexts=chunk,
backfilled=backfilled,
delete_existing=delete_existing,
current_state_for_room=current_state_for_room,
new_forward_extremeties=new_forward_extremeties,
)
persist_event_counter.inc_by(len(chunk))
@_retry_on_integrity_error
@defer.inlineCallbacks
@log_function
def _persist_event(self, event, context, current_state=None, backfilled=False,
delete_existing=False):
try:
with self._stream_id_gen.get_next() as stream_ordering:
event.internal_metadata.stream_ordering = stream_ordering
yield self.runInteraction(
"persist_event",
self._persist_event_txn,
event=event,
context=context,
current_state=current_state,
backfilled=backfilled,
delete_existing=delete_existing,
)
persist_event_counter.inc()
except _RollbackButIsFineException:
pass
def _calculate_new_extremeties(self, room_id, events):
"""Calculates the new forward extremeties for a room given events to
persist.
Assumes that we are only persisting events for one room at a time.
"""
latest_event_ids = yield self.get_latest_event_ids_in_room(
room_id
)
new_latest_event_ids = set(latest_event_ids)
# First, add all the new events to the list
new_latest_event_ids.update(
event.event_id for event in events
if not event.internal_metadata.is_outlier()
)
# Now remove all events that are referenced by the to-be-added events
new_latest_event_ids.difference_update(
e_id
for event in events
for e_id, _ in event.prev_events
if not event.internal_metadata.is_outlier()
)
# And finally remove any events that are referenced by previously added
# events.
rows = yield self._simple_select_many_batch(
table="event_edges",
column="prev_event_id",
iterable=list(new_latest_event_ids),
retcols=["prev_event_id"],
keyvalues={
"room_id": room_id,
"is_state": False,
},
desc="_calculate_new_extremeties",
)
new_latest_event_ids.difference_update(
row["prev_event_id"] for row in rows
)
defer.returnValue(new_latest_event_ids)
@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
@ -381,61 +462,92 @@ class EventsStore(SQLBaseStore):
defer.returnValue({e.event_id: e for e in events})
@log_function
def _persist_event_txn(self, txn, event, context, current_state, backfilled=False,
delete_existing=False):
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
def _persist_events_txn(self, txn, events_and_contexts, backfilled,
delete_existing=False, current_state_for_room={},
new_forward_extremeties={}):
"""Insert some number of room events into the necessary database tables.
Rejected events are only inserted into the events table, the events_json table,
and the rejections table. Things reading from those table will need to check
whether the event was rejected.
If delete_existing is True then existing events will be purged from the
database before insertion. This is useful when retrying due to IntegrityError.
"""
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
for room_id, current_state in current_state_for_room.iteritems():
txn.call_after(self._get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
txn.call_after(self.get_users_in_room.invalidate, (room_id,))
# Add an entry to the current_state_resets table to record the point
# where we clobbered the current state
stream_order = event.internal_metadata.stream_ordering
self._simple_insert_txn(
txn,
table="current_state_resets",
values={"event_stream_ordering": stream_order}
values={"event_stream_ordering": max_stream_order}
)
self._simple_delete_txn(
txn,
table="current_state_events",
keyvalues={"room_id": event.room_id},
keyvalues={"room_id": room_id},
)
for s in current_state:
self._simple_insert_txn(
txn,
"current_state_events",
self._simple_insert_many_txn(
txn,
table="current_state_events",
values=[
{
"event_id": s.event_id,
"room_id": s.room_id,
"type": s.type,
"state_key": s.state_key,
"event_id": ev_id,
"room_id": room_id,
"type": key[0],
"state_key": key[1],
}
)
for key, ev_id in current_state.iteritems()
],
)
for room_id, new_extrem in new_forward_extremeties.items():
self._simple_delete_txn(
txn,
table="event_forward_extremities",
keyvalues={"room_id": room_id},
)
txn.call_after(
self.get_latest_event_ids_in_room.invalidate, (room_id,)
)
return self._persist_events_txn(
self._simple_insert_many_txn(
txn,
[(event, context)],
backfilled=backfilled,
delete_existing=delete_existing,
table="event_forward_extremities",
values=[
{
"event_id": ev_id,
"room_id": room_id,
}
for room_id, new_extrem in new_forward_extremeties.items()
for ev_id in new_extrem
],
)
# We now insert into stream_ordering_to_exterm a mapping from room_id,
# new stream_ordering to new forward extremeties in the room.
# This allows us to later efficiently look up the forward extremeties
# for a room before a given stream_ordering
self._simple_insert_many_txn(
txn,
table="stream_ordering_to_exterm",
values=[
{
"room_id": room_id,
"event_id": event_id,
"stream_ordering": max_stream_order,
}
for room_id, new_extrem in new_forward_extremeties.items()
for event_id in new_extrem
]
)
@log_function
def _persist_events_txn(self, txn, events_and_contexts, backfilled,
delete_existing=False):
"""Insert some number of room events into the necessary database tables.
Rejected events are only inserted into the events table, the events_json table,
and the rejections table. Things reading from those table will need to check
whether the event was rejected.
If delete_existing is True then existing events will be purged from the
database before insertion. This is useful when retrying due to IntegrityError.
"""
# Ensure that we don't have the same event twice.
# Pick the earliest non-outlier if there is one, else the earliest one.
new_events_and_contexts = OrderedDict()
@ -550,7 +662,7 @@ class EventsStore(SQLBaseStore):
# Update the event_backward_extremities table now that this
# event isn't an outlier any more.
self._update_extremeties(txn, [event])
self._update_backward_extremeties(txn, [event])
events_and_contexts = [
ec for ec in events_and_contexts if ec[0] not in to_remove
@ -798,29 +910,6 @@ class EventsStore(SQLBaseStore):
# to update the current state table
return
for event, _ in state_events_and_contexts:
if event.internal_metadata.is_outlier():
# Outlier events shouldn't clobber the current state.
continue
txn.call_after(
self._get_current_state_for_key.invalidate,
(event.room_id, event.type, event.state_key,)
)
self._simple_upsert_txn(
txn,
"current_state_events",
keyvalues={
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
},
values={
"event_id": event.event_id,
}
)
return
def _add_to_cache(self, txn, events_and_contexts):

@ -60,7 +60,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
@defer.inlineCallbacks
def test_room_members(self):
create = yield self.persist(type="m.room.create", key="", creator=USER_ID)
yield self.persist(type="m.room.create", key="", creator=USER_ID)
yield self.replicate()
yield self.check("get_rooms_for_user", (USER_ID,), [])
yield self.check("get_users_in_room", (ROOM_ID,), [])
@ -95,15 +95,11 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
)])
yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID_2])
# Join the room clobbering the state.
# This should remove any evidence of the other user being in the room.
yield self.persist(
type="m.room.member", key=USER_ID, membership="join",
reset_state=[create]
)
yield self.replicate()
yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID])
yield self.check("get_rooms_for_user", (USER_ID_2,), [])
yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID_2, USER_ID])
@defer.inlineCallbacks
def test_get_latest_event_ids_in_room(self):
@ -125,7 +121,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
@defer.inlineCallbacks
def test_get_current_state(self):
# Create the room.
create = yield self.persist(type="m.room.create", key="", creator=USER_ID)
yield self.persist(type="m.room.create", key="", creator=USER_ID)
yield self.replicate()
yield self.check(
"get_current_state_for_key", (ROOM_ID, "m.room.member", USER_ID), []
@ -151,22 +147,6 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
[join2]
)
# Leave the room, then rejoin the room clobbering state.
yield self.persist(type="m.room.member", key=USER_ID, membership="leave")
join3 = yield self.persist(
type="m.room.member", key=USER_ID, membership="join",
reset_state=[create]
)
yield self.replicate()
yield self.check(
"get_current_state_for_key", (ROOM_ID, "m.room.member", USER_ID_2),
[]
)
yield self.check(
"get_current_state_for_key", (ROOM_ID, "m.room.member", USER_ID),
[join3]
)
@defer.inlineCallbacks
def test_redactions(self):
yield self.persist(type="m.room.create", key="", creator=USER_ID)
@ -283,6 +263,12 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
if depth is None:
depth = self.event_id
if not prev_events:
latest_event_ids = yield self.master_store.get_latest_event_ids_in_room(
room_id
)
prev_events = [(ev_id, {}) for ev_id in latest_event_ids]
event_dict = {
"sender": sender,
"type": type,
@ -309,12 +295,15 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
state_ids = {
key: e.event_id for key, e in state.items()
}
context = EventContext()
context.current_state_ids = state_ids
context.prev_state_ids = state_ids
elif not backfill:
state_handler = self.hs.get_state_handler()
context = yield state_handler.compute_event_context(event)
else:
state_ids = None
context = EventContext()
context = EventContext()
context.current_state_ids = state_ids
context.prev_state_ids = state_ids
context.push_actions = push_actions
ordering = None
@ -324,7 +313,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
)
else:
ordering, _ = yield self.master_store.persist_event(
event, context, current_state=reset_state
event, context,
)
if ordering:

Loading…
Cancel
Save