|
|
|
@ -397,6 +397,12 @@ class EventsStore(SQLBaseStore): |
|
|
|
|
|
|
|
|
|
@log_function |
|
|
|
|
def _persist_events_txn(self, txn, events_and_contexts, backfilled): |
|
|
|
|
"""Insert some number of room events into the necessary database tables. |
|
|
|
|
|
|
|
|
|
Rejected events are only inserted into the events table, the events_json table, |
|
|
|
|
and the rejections table. Things reading from those table will need to check |
|
|
|
|
whether the event was rejected. |
|
|
|
|
""" |
|
|
|
|
depth_updates = {} |
|
|
|
|
for event, context in events_and_contexts: |
|
|
|
|
# Remove the any existing cache entries for the event_ids |
|
|
|
@ -407,21 +413,11 @@ class EventsStore(SQLBaseStore): |
|
|
|
|
event.room_id, event.internal_metadata.stream_ordering, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if not event.internal_metadata.is_outlier(): |
|
|
|
|
if not event.internal_metadata.is_outlier() and not context.rejected: |
|
|
|
|
depth_updates[event.room_id] = max( |
|
|
|
|
event.depth, depth_updates.get(event.room_id, event.depth) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if context.push_actions: |
|
|
|
|
self._set_push_actions_for_event_and_users_txn( |
|
|
|
|
txn, event, context.push_actions |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if event.type == EventTypes.Redaction and event.redacts is not None: |
|
|
|
|
self._remove_push_actions_for_event_id_txn( |
|
|
|
|
txn, event.room_id, event.redacts |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
for room_id, depth in depth_updates.items(): |
|
|
|
|
self._update_min_depth_for_room_txn(txn, room_id, depth) |
|
|
|
|
|
|
|
|
@ -431,14 +427,24 @@ class EventsStore(SQLBaseStore): |
|
|
|
|
), |
|
|
|
|
[event.event_id for event, _ in events_and_contexts] |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
have_persisted = { |
|
|
|
|
event_id: outlier |
|
|
|
|
for event_id, outlier in txn.fetchall() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
# Remove the events that we've seen before. |
|
|
|
|
event_map = {} |
|
|
|
|
to_remove = set() |
|
|
|
|
for event, context in events_and_contexts: |
|
|
|
|
if context.rejected: |
|
|
|
|
# If the event is rejected then we don't care if the event |
|
|
|
|
# was an outlier or not. |
|
|
|
|
if event.event_id in have_persisted: |
|
|
|
|
# If we have already seen the event then ignore it. |
|
|
|
|
to_remove.add(event) |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
# Handle the case of the list including the same event multiple |
|
|
|
|
# times. The tricky thing here is when they differ by whether |
|
|
|
|
# they are an outlier. |
|
|
|
@ -463,6 +469,12 @@ class EventsStore(SQLBaseStore): |
|
|
|
|
|
|
|
|
|
outlier_persisted = have_persisted[event.event_id] |
|
|
|
|
if not event.internal_metadata.is_outlier() and outlier_persisted: |
|
|
|
|
# We received a copy of an event that we had already stored as |
|
|
|
|
# an outlier in the database. We now have some state at that |
|
|
|
|
# so we need to update the state_groups table with that state. |
|
|
|
|
|
|
|
|
|
# insert into the state_group, state_groups_state and |
|
|
|
|
# event_to_state_groups tables. |
|
|
|
|
self._store_mult_state_groups_txn(txn, ((event, context),)) |
|
|
|
|
|
|
|
|
|
metadata_json = encode_json( |
|
|
|
@ -478,6 +490,8 @@ class EventsStore(SQLBaseStore): |
|
|
|
|
(metadata_json, event.event_id,) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# Add an entry to the ex_outlier_stream table to replicate the |
|
|
|
|
# change in outlier status to our workers. |
|
|
|
|
stream_order = event.internal_metadata.stream_ordering |
|
|
|
|
state_group_id = context.state_group or context.new_state_group_id |
|
|
|
|
self._simple_insert_txn( |
|
|
|
@ -499,6 +513,8 @@ class EventsStore(SQLBaseStore): |
|
|
|
|
(False, event.event_id,) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# Update the event_backward_extremities table now that this |
|
|
|
|
# event isn't an outlier any more. |
|
|
|
|
self._update_extremeties(txn, [event]) |
|
|
|
|
|
|
|
|
|
events_and_contexts = [ |
|
|
|
@ -506,38 +522,12 @@ class EventsStore(SQLBaseStore): |
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
if not events_and_contexts: |
|
|
|
|
# Make sure we don't pass an empty list to functions that expect to |
|
|
|
|
# be storing at least one element. |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
self._store_mult_state_groups_txn(txn, events_and_contexts) |
|
|
|
|
|
|
|
|
|
self._handle_mult_prev_events( |
|
|
|
|
txn, |
|
|
|
|
events=[event for event, _ in events_and_contexts], |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
for event, _ in events_and_contexts: |
|
|
|
|
if event.type == EventTypes.Name: |
|
|
|
|
self._store_room_name_txn(txn, event) |
|
|
|
|
elif event.type == EventTypes.Topic: |
|
|
|
|
self._store_room_topic_txn(txn, event) |
|
|
|
|
elif event.type == EventTypes.Message: |
|
|
|
|
self._store_room_message_txn(txn, event) |
|
|
|
|
elif event.type == EventTypes.Redaction: |
|
|
|
|
self._store_redaction(txn, event) |
|
|
|
|
elif event.type == EventTypes.RoomHistoryVisibility: |
|
|
|
|
self._store_history_visibility_txn(txn, event) |
|
|
|
|
elif event.type == EventTypes.GuestAccess: |
|
|
|
|
self._store_guest_access_txn(txn, event) |
|
|
|
|
|
|
|
|
|
self._store_room_members_txn( |
|
|
|
|
txn, |
|
|
|
|
[ |
|
|
|
|
event |
|
|
|
|
for event, _ in events_and_contexts |
|
|
|
|
if event.type == EventTypes.Member |
|
|
|
|
], |
|
|
|
|
backfilled=backfilled, |
|
|
|
|
) |
|
|
|
|
# From this point onwards the events are only events that we haven't |
|
|
|
|
# seen before. |
|
|
|
|
|
|
|
|
|
def event_dict(event): |
|
|
|
|
return { |
|
|
|
@ -591,10 +581,41 @@ class EventsStore(SQLBaseStore): |
|
|
|
|
], |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# Remove the rejected events from the list now that we've added them |
|
|
|
|
# to the events table and the events_json table. |
|
|
|
|
to_remove = set() |
|
|
|
|
for event, context in events_and_contexts: |
|
|
|
|
if context.rejected: |
|
|
|
|
# Insert the event_id into the rejections table |
|
|
|
|
self._store_rejections_txn( |
|
|
|
|
txn, event.event_id, context.rejected |
|
|
|
|
) |
|
|
|
|
to_remove.add(event) |
|
|
|
|
|
|
|
|
|
events_and_contexts = [ |
|
|
|
|
ec for ec in events_and_contexts if ec[0] not in to_remove |
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
if not events_and_contexts: |
|
|
|
|
# Make sure we don't pass an empty list to functions that expect to |
|
|
|
|
# be storing at least one element. |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
# From this point onwards the events are only ones that weren't rejected. |
|
|
|
|
|
|
|
|
|
for event, context in events_and_contexts: |
|
|
|
|
# Insert all the push actions into the event_push_actions table. |
|
|
|
|
if context.push_actions: |
|
|
|
|
self._set_push_actions_for_event_and_users_txn( |
|
|
|
|
txn, event, context.push_actions |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if event.type == EventTypes.Redaction and event.redacts is not None: |
|
|
|
|
# Remove the entries in the event_push_actions table for the |
|
|
|
|
# redacted event. |
|
|
|
|
self._remove_push_actions_for_event_id_txn( |
|
|
|
|
txn, event.room_id, event.redacts |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
self._simple_insert_many_txn( |
|
|
|
|
txn, |
|
|
|
@ -610,6 +631,49 @@ class EventsStore(SQLBaseStore): |
|
|
|
|
], |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# Insert into the state_groups, state_groups_state, and |
|
|
|
|
# event_to_state_groups tables. |
|
|
|
|
self._store_mult_state_groups_txn(txn, events_and_contexts) |
|
|
|
|
|
|
|
|
|
# Update the event_forward_extremities, event_backward_extremities and |
|
|
|
|
# event_edges tables. |
|
|
|
|
self._handle_mult_prev_events( |
|
|
|
|
txn, |
|
|
|
|
events=[event for event, _ in events_and_contexts], |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
for event, _ in events_and_contexts: |
|
|
|
|
if event.type == EventTypes.Name: |
|
|
|
|
# Insert into the room_names and event_search tables. |
|
|
|
|
self._store_room_name_txn(txn, event) |
|
|
|
|
elif event.type == EventTypes.Topic: |
|
|
|
|
# Insert into the topics table and event_search table. |
|
|
|
|
self._store_room_topic_txn(txn, event) |
|
|
|
|
elif event.type == EventTypes.Message: |
|
|
|
|
# Insert into the event_search table. |
|
|
|
|
self._store_room_message_txn(txn, event) |
|
|
|
|
elif event.type == EventTypes.Redaction: |
|
|
|
|
# Insert into the redactions table. |
|
|
|
|
self._store_redaction(txn, event) |
|
|
|
|
elif event.type == EventTypes.RoomHistoryVisibility: |
|
|
|
|
# Insert into the event_search table. |
|
|
|
|
self._store_history_visibility_txn(txn, event) |
|
|
|
|
elif event.type == EventTypes.GuestAccess: |
|
|
|
|
# Insert into the event_search table. |
|
|
|
|
self._store_guest_access_txn(txn, event) |
|
|
|
|
|
|
|
|
|
# Insert into the room_memberships table. |
|
|
|
|
self._store_room_members_txn( |
|
|
|
|
txn, |
|
|
|
|
[ |
|
|
|
|
event |
|
|
|
|
for event, _ in events_and_contexts |
|
|
|
|
if event.type == EventTypes.Member |
|
|
|
|
], |
|
|
|
|
backfilled=backfilled, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# Insert event_reference_hashes table. |
|
|
|
|
self._store_event_reference_hashes_txn( |
|
|
|
|
txn, [event for event, _ in events_and_contexts] |
|
|
|
|
) |
|
|
|
@ -654,6 +718,7 @@ class EventsStore(SQLBaseStore): |
|
|
|
|
], |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# Prefill the event cache |
|
|
|
|
self._add_to_cache(txn, events_and_contexts) |
|
|
|
|
|
|
|
|
|
if backfilled: |
|
|
|
@ -666,11 +731,6 @@ class EventsStore(SQLBaseStore): |
|
|
|
|
# Outlier events shouldn't clobber the current state. |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
if context.rejected: |
|
|
|
|
# If the event failed it's auth checks then it shouldn't |
|
|
|
|
# clobbler the current state. |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
txn.call_after( |
|
|
|
|
self._get_current_state_for_key.invalidate, |
|
|
|
|
(event.room_id, event.type, event.state_key,) |
|
|
|
|