@ -37,6 +37,7 @@ from synapse.logging.context import (
)
)
from synapse . metrics . background_process_metrics import run_as_background_process
from synapse . metrics . background_process_metrics import run_as_background_process
from synapse . types import get_domain_from_id
from synapse . types import get_domain_from_id
from synapse . util import batch_iter
from synapse . util . metrics import Measure
from synapse . util . metrics import Measure
from . _base import SQLBaseStore
from . _base import SQLBaseStore
@ -240,27 +241,8 @@ class EventsWorkerStore(SQLBaseStore):
# instead.
# instead.
if not allow_rejected and entry . event . type == EventTypes . Redaction :
if not allow_rejected and entry . event . type == EventTypes . Redaction :
if entry . event . internal_metadata . need_to_check_redaction ( ) :
if entry . event . internal_metadata . need_to_check_redaction ( ) :
# XXX: we need to avoid calling get_event here.
# XXX: we should probably use _get_events_from_cache_or_db here,
#
# so that we can benefit from caching.
# The problem is that we end up at this point when an event
# which has been redacted is pulled out of the database by
# _enqueue_events, because _enqueue_events needs to check
# the redaction before it can cache the redacted event. So
# obviously, calling get_event to get the redacted event out
# of the database gives us an infinite loop.
#
# For now (quick hack to fix during 0.99 release cycle), we
# just go and fetch the relevant row from the db, but it
# would be nice to think about how we can cache this rather
# than hit the db every time we access a redaction event.
#
# One thought on how to do this:
# 1. split get_events_as_list up so that it is divided into
# (a) get the rawish event from the db/cache, (b) do the
# redaction/rejection filtering
# 2. have _get_event_from_row just call the first half of
# that
orig_sender = yield self . _simple_select_one_onecol (
orig_sender = yield self . _simple_select_one_onecol (
table = " events " ,
table = " events " ,
keyvalues = { " event_id " : entry . event . redacts } ,
keyvalues = { " event_id " : entry . event . redacts } ,
@ -410,19 +392,16 @@ class EventsWorkerStore(SQLBaseStore):
The fetch requests . Each entry consists of a list of event
The fetch requests . Each entry consists of a list of event
ids to be fetched , and a deferred to be completed once the
ids to be fetched , and a deferred to be completed once the
events have been fetched .
events have been fetched .
"""
"""
with Measure ( self . _clock , " _fetch_event_list " ) :
with Measure ( self . _clock , " _fetch_event_list " ) :
try :
try :
event_id_lists = list ( zip ( * event_list ) ) [ 0 ]
event_id_lists = list ( zip ( * event_list ) ) [ 0 ]
event_ids = [ item for sublist in event_id_lists for item in sublist ]
event_ids = [ item for sublist in event_id_lists for item in sublist ]
rows = self . _new_transaction (
row_dict = self . _new_transaction (
conn , " do_fetch " , [ ] , [ ] , self . _fetch_event_rows , event_ids
conn , " do_fetch " , [ ] , [ ] , self . _fetch_event_rows , event_ids
)
)
row_dict = { r [ " event_id " ] : r for r in rows }
# We only want to resolve deferreds from the main thread
# We only want to resolve deferreds from the main thread
def fire ( lst , res ) :
def fire ( lst , res ) :
for ids , d in lst :
for ids , d in lst :
@ -480,7 +459,7 @@ class EventsWorkerStore(SQLBaseStore):
logger . debug ( " Loaded %d events ( %d rows) " , len ( events ) , len ( rows ) )
logger . debug ( " Loaded %d events ( %d rows) " , len ( events ) , len ( rows ) )
if not allow_rejected :
if not allow_rejected :
rows [ : ] = [ r for r in rows if not r [ " rejects " ] ]
rows [ : ] = [ r for r in rows if r [ " rejected_rea son " ] is None ]
res = yield make_deferred_yieldable (
res = yield make_deferred_yieldable (
defer . gatherResults (
defer . gatherResults (
@ -489,8 +468,8 @@ class EventsWorkerStore(SQLBaseStore):
self . _get_event_from_row ,
self . _get_event_from_row ,
row [ " internal_metadata " ] ,
row [ " internal_metadata " ] ,
row [ " json " ] ,
row [ " json " ] ,
row [ " redacts " ] ,
row [ " redaction s " ] ,
rejected_reason = row [ " rejects " ] ,
rejected_reason = row [ " rejected_rea son " ] ,
format_version = row [ " format_version " ] ,
format_version = row [ " format_version " ] ,
)
)
for row in rows
for row in rows
@ -501,49 +480,98 @@ class EventsWorkerStore(SQLBaseStore):
defer . returnValue ( { e . event . event_id : e for e in res if e } )
defer . returnValue ( { e . event . event_id : e for e in res if e } )
def _fetch_event_rows ( self , txn , events ) :
def _fetch_event_rows ( self , txn , event_ids ) :
rows = [ ]
""" Fetch event rows from the database
N = 200
for i in range ( 1 + len ( events ) / / N ) :
Events which are not found are omitted from the result .
evs = events [ i * N : ( i + 1 ) * N ]
if not evs :
The returned per - event dicts contain the following keys :
break
* event_id ( str )
* json ( str ) : json - encoded event structure
* internal_metadata ( str ) : json - encoded internal metadata dict
* format_version ( int | None ) : The format of the event . Hopefully one
of EventFormatVersions . ' None ' means the event predates
EventFormatVersions ( so the event is format V1 ) .
* rejected_reason ( str | None ) : if the event was rejected , the reason
why .
* redactions ( List [ str ] ) : a list of event - ids which ( claim to ) redact
this event .
Args :
txn ( twisted . enterprise . adbapi . Connection ) :
event_ids ( Iterable [ str ] ) : event IDs to fetch
Returns :
Dict [ str , Dict ] : a map from event id to event info .
"""
event_dict = { }
for evs in batch_iter ( event_ids , 200 ) :
sql = (
sql = (
" SELECT "
" SELECT "
" e.event_id as event_id, "
" e.event_id, "
" e.internal_metadata, "
" e.internal_metadata, "
" e.json, "
" e.json, "
" e.format_version, "
" e.format_version, "
" r.redacts as redacts, "
" rej.reason "
" rej.event_id as rejects "
" FROM event_json as e "
" FROM event_json as e "
" LEFT JOIN rejections as rej USING (event_id) "
" LEFT JOIN rejections as rej USING (event_id) "
" LEFT JOIN redactions as r ON e.event_id = r.redacts "
" WHERE e.event_id IN ( %s ) "
" WHERE e.event_id IN ( %s ) "
) % ( " , " . join ( [ " ? " ] * len ( evs ) ) , )
) % ( " , " . join ( [ " ? " ] * len ( evs ) ) , )
txn . execute ( sql , evs )
txn . execute ( sql , evs )
rows . extend ( self . cursor_to_dict ( txn ) )
return rows
for row in txn :
event_id = row [ 0 ]
event_dict [ event_id ] = {
" event_id " : event_id ,
" internal_metadata " : row [ 1 ] ,
" json " : row [ 2 ] ,
" format_version " : row [ 3 ] ,
" rejected_reason " : row [ 4 ] ,
" redactions " : [ ] ,
}
# check for redactions
redactions_sql = (
" SELECT event_id, redacts FROM redactions WHERE redacts IN ( %s ) "
) % ( " , " . join ( [ " ? " ] * len ( evs ) ) , )
txn . execute ( redactions_sql , evs )
for ( redacter , redacted ) in txn :
d = event_dict . get ( redacted )
if d :
d [ " redactions " ] . append ( redacter )
return event_dict
@defer . inlineCallbacks
@defer . inlineCallbacks
def _get_event_from_row (
def _get_event_from_row (
self , internal_metadata , js , redacted , format_version , rejected_reason = None
self , internal_metadata , js , redactions , format_version , rejected_reason = None
) :
) :
""" Parse an event row which has been read from the database
Args :
internal_metadata ( str ) : json - encoded internal_metadata column
js ( str ) : json - encoded event body from event_json
redactions ( list [ str ] ) : a list of the events which claim to have redacted
this event , from the redactions table
format_version : ( str ) : the ' format_version ' column
rejected_reason ( str | None ) : the reason this event was rejected , if any
Returns :
_EventCacheEntry
"""
with Measure ( self . _clock , " _get_event_from_row " ) :
with Measure ( self . _clock , " _get_event_from_row " ) :
d = json . loads ( js )
d = json . loads ( js )
internal_metadata = json . loads ( internal_metadata )
internal_metadata = json . loads ( internal_metadata )
if rejected_reason :
rejected_reason = yield self . _simple_select_one_onecol (
table = " rejections " ,
keyvalues = { " event_id " : rejected_reason } ,
retcol = " reason " ,
desc = " _get_event_from_row_rejected_reason " ,
)
if format_version is None :
if format_version is None :
# This means that we stored the event before we had the concept
# This means that we stored the event before we had the concept
# of a event format version, so it must be a V1 event.
# of a event format version, so it must be a V1 event.
@ -555,41 +583,7 @@ class EventsWorkerStore(SQLBaseStore):
rejected_reason = rejected_reason ,
rejected_reason = rejected_reason ,
)
)
redacted_event = None
redacted_event = yield self . _maybe_redact_event_row ( original_ev , redactions )
if redacted :
redacted_event = prune_event ( original_ev )
redaction_id = yield self . _simple_select_one_onecol (
table = " redactions " ,
keyvalues = { " redacts " : redacted_event . event_id } ,
retcol = " event_id " ,
desc = " _get_event_from_row_redactions " ,
)
redacted_event . unsigned [ " redacted_by " ] = redaction_id
# Get the redaction event.
because = yield self . get_event (
redaction_id , check_redacted = False , allow_none = True
)
if because :
# It's fine to do add the event directly, since get_pdu_json
# will serialise this field correctly
redacted_event . unsigned [ " redacted_because " ] = because
# Starting in room version v3, some redactions need to be
# rechecked if we didn't have the redacted event at the
# time, so we recheck on read instead.
if because . internal_metadata . need_to_check_redaction ( ) :
expected_domain = get_domain_from_id ( original_ev . sender )
if get_domain_from_id ( because . sender ) == expected_domain :
# This redaction event is allowed. Mark as not needing a
# recheck.
because . internal_metadata . recheck_redaction = False
else :
# Senders don't match, so the event isn't actually redacted
redacted_event = None
cache_entry = _EventCacheEntry (
cache_entry = _EventCacheEntry (
event = original_ev , redacted_event = redacted_event
event = original_ev , redacted_event = redacted_event
@ -599,6 +593,56 @@ class EventsWorkerStore(SQLBaseStore):
defer . returnValue ( cache_entry )
defer . returnValue ( cache_entry )
@defer . inlineCallbacks
def _maybe_redact_event_row ( self , original_ev , redactions ) :
""" Given an event object and a list of possible redacting event ids,
determine whether to honour any of those redactions and if so return a redacted
event .
Args :
original_ev ( EventBase ) :
redactions ( iterable [ str ] ) : list of event ids of potential redaction events
Returns :
Deferred [ EventBase | None ] : if the event should be redacted , a pruned
event object . Otherwise , None .
"""
redaction_map = yield self . _get_events_from_cache_or_db ( redactions )
for redaction_id in redactions :
redaction_entry = redaction_map . get ( redaction_id )
if not redaction_entry :
# we don't have the redaction event, or the redaction event was not
# authorized.
continue
redaction_event = redaction_entry . event
# Starting in room version v3, some redactions need to be
# rechecked if we didn't have the redacted event at the
# time, so we recheck on read instead.
if redaction_event . internal_metadata . need_to_check_redaction ( ) :
expected_domain = get_domain_from_id ( original_ev . sender )
if get_domain_from_id ( redaction_event . sender ) == expected_domain :
# This redaction event is allowed. Mark as not needing a recheck.
redaction_event . internal_metadata . recheck_redaction = False
else :
# Senders don't match, so the event isn't actually redacted
continue
# we found a good redaction event. Redact!
redacted_event = prune_event ( original_ev )
redacted_event . unsigned [ " redacted_by " ] = redaction_id
# It's fine to add the event directly, since get_pdu_json
# will serialise this field correctly
redacted_event . unsigned [ " redacted_because " ] = redaction_event
return redacted_event
# no valid redaction found for this event
return None
@defer . inlineCallbacks
@defer . inlineCallbacks
def have_events_in_timeline ( self , event_ids ) :
def have_events_in_timeline ( self , event_ids ) :
""" Given a list of event ids, check if we have already processed and
""" Given a list of event ids, check if we have already processed and