@ -17,7 +17,6 @@
import itertools
import logging
from collections import OrderedDict , namedtuple
from functools import wraps
from typing import TYPE_CHECKING , Dict , Iterable , List , Tuple
import attr
@ -69,27 +68,6 @@ def encode_json(json_object):
_EventCacheEntry = namedtuple ( " _EventCacheEntry " , ( " event " , " redacted_event " ) )
def _retry_on_integrity_error ( func ) :
""" Wraps a database function so that it gets retried on IntegrityError,
with ` delete_existing = True ` passed in .
Args :
func : function that returns a Deferred and accepts a ` delete_existing ` arg
"""
@wraps ( func )
@defer . inlineCallbacks
def f ( self , * args , * * kwargs ) :
try :
res = yield func ( self , * args , delete_existing = False , * * kwargs )
except self . database_engine . module . IntegrityError :
logger . exception ( " IntegrityError, retrying. " )
res = yield func ( self , * args , delete_existing = True , * * kwargs )
return res
return f
@attr . s ( slots = True )
class DeltaState :
""" Deltas to use to update the `current_state_events` table.
@ -134,7 +112,6 @@ class PersistEventsStore:
hs . config . worker . writers . events == hs . get_instance_name ( )
) , " Can only instantiate EventsStore on master "
@_retry_on_integrity_error
@defer . inlineCallbacks
def _persist_events_and_state_updates (
self ,
@ -143,7 +120,6 @@ class PersistEventsStore:
state_delta_for_room : Dict [ str , DeltaState ] ,
new_forward_extremeties : Dict [ str , List [ str ] ] ,
backfilled : bool = False ,
delete_existing : bool = False ,
) :
""" Persist a set of events alongside updates to the current state and
forward extremities tables .
@ -157,7 +133,6 @@ class PersistEventsStore:
new_forward_extremities : Map from room_id to list of event IDs
that are the new forward extremities of the room .
backfilled
delete_existing
Returns :
Deferred : resolves when the events have been persisted
@ -197,7 +172,6 @@ class PersistEventsStore:
self . _persist_events_txn ,
events_and_contexts = events_and_contexts ,
backfilled = backfilled ,
delete_existing = delete_existing ,
state_delta_for_room = state_delta_for_room ,
new_forward_extremeties = new_forward_extremeties ,
)
@ -341,7 +315,6 @@ class PersistEventsStore:
txn : LoggingTransaction ,
events_and_contexts : List [ Tuple [ EventBase , EventContext ] ] ,
backfilled : bool ,
delete_existing : bool = False ,
state_delta_for_room : Dict [ str , DeltaState ] = { } ,
new_forward_extremeties : Dict [ str , List [ str ] ] = { } ,
) :
@ -393,13 +366,6 @@ class PersistEventsStore:
# From this point onwards the events are only events that we haven't
# seen before.
if delete_existing :
# For paranoia reasons, we go and delete all the existing entries
# for these events so we can reinsert them.
# This gets around any problems with some tables already having
# entries.
self . _delete_existing_rows_txn ( txn , events_and_contexts = events_and_contexts )
self . _store_event_txn ( txn , events_and_contexts = events_and_contexts )
# Insert into event_to_state_groups.
@ -797,39 +763,6 @@ class PersistEventsStore:
return [ ec for ec in events_and_contexts if ec [ 0 ] not in to_remove ]
@classmethod
def _delete_existing_rows_txn ( cls , txn , events_and_contexts ) :
if not events_and_contexts :
# nothing to do here
return
logger . info ( " Deleting existing " )
for table in (
" events " ,
" event_auth " ,
" event_json " ,
" event_edges " ,
" event_forward_extremities " ,
" event_reference_hashes " ,
" event_search " ,
" event_to_state_groups " ,
" state_events " ,
" rejections " ,
" redactions " ,
" room_memberships " ,
) :
txn . executemany (
" DELETE FROM %s WHERE event_id = ? " % ( table , ) ,
[ ( ev . event_id , ) for ev , _ in events_and_contexts ] ,
)
for table in ( " event_push_actions " , ) :
txn . executemany (
" DELETE FROM %s WHERE room_id = ? AND event_id = ? " % ( table , ) ,
[ ( ev . room_id , ev . event_id ) for ev , _ in events_and_contexts ] ,
)
def _store_event_txn ( self , txn , events_and_contexts ) :
""" Insert new events into the event and event_json tables