@ -364,147 +364,161 @@ class EventsStore(
if not events_and_contexts :
return
if backfilled :
stream_ordering_manager = self . _backfill_id_gen . get_next_mult (
len ( events_and_contexts )
)
else :
stream_ordering_manager = self . _stream_id_gen . get_next_mult (
len ( events_and_contexts )
)
with stream_ordering_manager as stream_orderings :
for ( event , context ) , stream in zip ( events_and_contexts , stream_orderings ) :
event . internal_metadata . stream_ordering = stream
chunks = [
events_and_contexts [ x : x + 100 ]
for x in range ( 0 , len ( events_and_contexts ) , 100 )
]
for chunk in chunks :
# We can't easily parallelize these since different chunks
# might contain the same event. :(
chunks = [
events_and_contexts [ x : x + 100 ]
for x in range ( 0 , len ( events_and_contexts ) , 100 )
]
# NB: Assumes that we are only persisting events for one room
# at a time.
for chunk in chunks :
# We can't easily parallelize these since different chunks
# might contain the same event. :(
# map room_id->list[event_ids] giving the new forward
# extremities in each room
new_forward_extremeties = { }
# NB: Assumes that we are only persisting events for one room
# at a time.
# map room_id->(type,state_key)->event_id tracking the full
# state in each room after adding these events.
# This is simply used to prefill the get_current_state_ids
# cache
current_state_for_room = { }
# map room_id->list[event_ids] giving the new forward
# extremities in each room
new_forward_extremeties = { }
# map room_id->(to_delete, to_insert) where to_delete is a list
# of type/state keys to remove from current state, and to_insert
# is a map (type,key)->event_id giving the state delta in each
# room
state_delta _for_room = { }
# map room_id->(type,state_key)->event_id tracking the full
# state in each room after adding these events.
# This is simply used to prefill the get_current_state_ids
# cache
current_state_for_room = { }
if not backfilled :
with Measure ( self . _clock , " _calculate_state_and_extrem " ) :
# Work out the new "current state" for each room.
# We do this by working out what the new extremities are and then
# calculating the state from that.
events_by_room = { }
for event , context in chunk :
events_by_room . setdefault ( event . room_id , [ ] ) . append (
( event , context )
)
# map room_id->(to_delete, to_insert) where to_delete is a list
# of type/state keys to remove from current state, and to_insert
# is a map (type,key)->event_id giving the state delta in each
# room
state_delta_for_room = { }
for room_id , ev_ctx_rm in iteritems ( events_by_room ) :
latest_event_ids = yield self . get_latest_event_ids_in_room (
room_id
)
new_latest_event_ids = yield self . _calculate_new_extremities (
room_id , ev_ctx_rm , latest_event_ids
if not backfilled :
with Measure ( self . _clock , " _calculate_state_and_extrem " ) :
# Work out the new "current state" for each room.
# We do this by working out what the new extremities are and then
# calculating the state from that.
events_by_room = { }
for event , context in chunk :
events_by_room . setdefault ( event . room_id , [ ] ) . append (
( event , context )
)
for room_id , ev_ctx_rm in iteritems ( events_by_room ) :
latest_event_ids = yield self . get_latest_event_ids_in_room (
room_id
)
new_latest_event_ids = yield self . _calculate_new_extremities (
room_id , ev_ctx_rm , latest_event_ids
)
latest_event_ids = set ( latest_event_ids )
if new_latest_event_ids == latest_event_ids :
# No change in extremities, so no change in state
continue
# there should always be at least one forward extremity.
# (except during the initial persistence of the send_join
# results, in which case there will be no existing
# extremities, so we'll `continue` above and skip this bit.)
assert new_latest_event_ids , " No forward extremities left! "
new_forward_extremeties [ room_id ] = new_latest_event_ids
len_1 = (
len ( latest_event_ids ) == 1
and len ( new_latest_event_ids ) == 1
)
if len_1 :
all_single_prev_not_state = all (
len ( event . prev_event_ids ( ) ) == 1
and not event . is_state ( )
for event , ctx in ev_ctx_rm
)
latest_event_ids = set ( latest_event_ids )
if new_latest_event_ids == latest_event_ids :
# No change in extremities, so no change in state
# Don't bother calculating state if they're just
# a long chain of single ancestor non-state events.
if all_single_prev_not_state :
continue
# there should always be at least one forward extremity.
# (except during the initial persistence of the send_join
# results, in which case there will be no existing
# extremities, so we'll `continue` above and skip this bit.)
assert new_latest_event_ids , " No forward extremities left! "
new_forward_extremeties [ room_id ] = new_latest_event_ids
len_1 = (
len ( latest_event_ids ) == 1
and len ( new_latest_event_ids ) == 1
state_delta_counter . inc ( )
if len ( new_latest_event_ids ) == 1 :
state_delta_single_event_counter . inc ( )
# This is a fairly handwavey check to see if we could
# have guessed what the delta would have been when
# processing one of these events.
# What we're interested in is if the latest extremities
# were the same when we created the event as they are
# now. When this server creates a new event (as opposed
# to receiving it over federation) it will use the
# forward extremities as the prev_events, so we can
# guess this by looking at the prev_events and checking
# if they match the current forward extremities.
for ev , _ in ev_ctx_rm :
prev_event_ids = set ( ev . prev_event_ids ( ) )
if latest_event_ids == prev_event_ids :
state_delta_reuse_delta_counter . inc ( )
break
logger . info ( " Calculating state delta for room %s " , room_id )
with Measure (
self . _clock , " persist_events.get_new_state_after_events "
) :
res = yield self . _get_new_state_after_events (
room_id ,
ev_ctx_rm ,
latest_event_ids ,
new_latest_event_ids ,
)
if len_1 :
all_single_prev_not_state = all (
len ( event . prev_event_ids ( ) ) == 1
and not event . is_state ( )
for event , ctx in ev_ctx_rm
)
# Don't bother calculating state if they're just
# a long chain of single ancestor non-state events.
if all_single_prev_not_state :
continue
state_delta_counter . inc ( )
if len ( new_latest_event_ids ) == 1 :
state_delta_single_event_counter . inc ( )
# This is a fairly handwavey check to see if we could
# have guessed what the delta would have been when
# processing one of these events.
# What we're interested in is if the latest extremities
# were the same when we created the event as they are
# now. When this server creates a new event (as opposed
# to receiving it over federation) it will use the
# forward extremities as the prev_events, so we can
# guess this by looking at the prev_events and checking
# if they match the current forward extremities.
for ev , _ in ev_ctx_rm :
prev_event_ids = set ( ev . prev_event_ids ( ) )
if latest_event_ids == prev_event_ids :
state_delta_reuse_delta_counter . inc ( )
break
logger . info ( " Calculating state delta for room %s " , room_id )
current_state , delta_ids = res
# If either are not None then there has been a change,
# and we need to work out the delta (or use that
# given)
if delta_ids is not None :
# If there is a delta we know that we've
# only added or replaced state, never
# removed keys entirely.
state_delta_for_room [ room_id ] = ( [ ] , delta_ids )
elif current_state is not None :
with Measure (
self . _clock , " persist_events.get_new_state_after_events "
self . _clock , " persist_events.calculate_state_delta "
) :
res = yield self . _get_new_state_after_events (
room_id ,
ev_ctx_rm ,
latest_event_ids ,
new_latest_event_ids ,
delta = yield self . _calculate_state_delta (
room_id , current_state
)
current_state , delta_ids = res
# If either are not None then there has been a change,
# and we need to work out the delta (or use that
# given)
if delta_ids is not None :
# If there is a delta we know that we've
# only added or replaced state, never
# removed keys entirely.
state_delta_for_room [ room_id ] = ( [ ] , delta_ids )
elif current_state is not None :
with Measure (
self . _clock , " persist_events.calculate_state_delta "
) :
delta = yield self . _calculate_state_delta (
room_id , current_state
)
state_delta_for_room [ room_id ] = delta
# If we have the current_state then lets prefill
# the cache with it.
if current_state is not None :
current_state_for_room [ room_id ] = current_state
state_delta_for_room [ room_id ] = delta
# If we have the current_state then lets prefill
# the cache with it.
if current_state is not None :
current_state_for_room [ room_id ] = current_state
# We want to calculate the stream orderings as late as possible, as
# we only notify after all events with a lesser stream ordering have
# been persisted. I.e. if we spend 10s inside the with block then
# that will delay all subsequent events from being notified about.
# Hence why we do it down here rather than wrapping the entire
# function.
#
# Its safe to do this after calculating the state deltas etc as we
# only need to protect the *persistence* of the events. This is to
# ensure that queries of the form "fetch events since X" don't
# return events and stream positions after events that are still in
# flight, as otherwise subsequent requests "fetch event since Y"
# will not return those events.
#
# Note: Multiple instances of this function cannot be in flight at
# the same time for the same room.
if backfilled :
stream_ordering_manager = self . _backfill_id_gen . get_next_mult (
len ( chunk )
)
else :
stream_ordering_manager = self . _stream_id_gen . get_next_mult ( len ( chunk ) )
with stream_ordering_manager as stream_orderings :
for ( event , context ) , stream in zip ( chunk , stream_orderings ) :
event . internal_metadata . stream_ordering = stream
yield self . runInteraction (
" persist_events " ,