@ -13,9 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING , Dict , List , Optional , Set
import attr
from typing import TYPE_CHECKING , List , Optional , Set , Tuple , cast
from twisted . python . failure import Failure
@ -23,16 +21,22 @@ from synapse.api.constants import Direction, EventTypes, Membership
from synapse . api . errors import SynapseError
from synapse . api . filtering import Filter
from synapse . events . utils import SerializeEventConfig
from synapse . handlers . room import ShutdownRoomResponse
from synapse . handlers . room import ShutdownRoomParams , ShutdownRoom Response
from synapse . handlers . worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse . logging . opentracing import trace
from synapse . metrics . background_process_metrics import run_as_background_process
from synapse . rest . admin . _base import assert_user_is_admin
from synapse . streams . config import PaginationConfig
from synapse . types import JsonDict , Requester , StrCollection , StreamKeyType
from synapse . types import (
JsonDict ,
JsonMapping ,
Requester ,
ScheduledTask ,
StreamKeyType ,
TaskStatus ,
)
from synapse . types . state import StateFilter
from synapse . util . async_helpers import ReadWriteLock
from synapse . util . stringutils import random_string
from synapse . visibility import filter_events_for_client
if TYPE_CHECKING :
@ -53,80 +57,11 @@ BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3
PURGE_PAGINATION_LOCK_NAME = " purge_pagination_lock "
@attr . s ( slots = True , auto_attribs = True )
class PurgeStatus :
""" Object tracking the status of a purge request
This class contains information on the progress of a purge request , for
return by get_purge_status .
"""
STATUS_ACTIVE = 0
STATUS_COMPLETE = 1
STATUS_FAILED = 2
STATUS_TEXT = {
STATUS_ACTIVE : " active " ,
STATUS_COMPLETE : " complete " ,
STATUS_FAILED : " failed " ,
}
# Save the error message if an error occurs
error : str = " "
# Tracks whether this request has completed. One of STATUS_{ACTIVE,COMPLETE,FAILED}.
status : int = STATUS_ACTIVE
def asdict ( self ) - > JsonDict :
ret = { " status " : PurgeStatus . STATUS_TEXT [ self . status ] }
if self . error :
ret [ " error " ] = self . error
return ret
@attr . s ( slots = True , auto_attribs = True )
class DeleteStatus :
""" Object tracking the status of a delete room request
PURGE_HISTORY_ACTION_NAME = " purge_history "
This class contains information on the progress of a delete room request , for
return by get_delete_status .
"""
PURGE_ROOM_ACTION_NAME = " purge_room "
STATUS_PURGING = 0
STATUS_COMPLETE = 1
STATUS_FAILED = 2
STATUS_SHUTTING_DOWN = 3
STATUS_TEXT = {
STATUS_PURGING : " purging " ,
STATUS_COMPLETE : " complete " ,
STATUS_FAILED : " failed " ,
STATUS_SHUTTING_DOWN : " shutting_down " ,
}
# Tracks whether this request has completed.
# One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN}.
status : int = STATUS_PURGING
# Save the error message if an error occurs
error : str = " "
# Saves the result of an action to give it back to REST API
shutdown_room : ShutdownRoomResponse = {
" kicked_users " : [ ] ,
" failed_to_kick_users " : [ ] ,
" local_aliases " : [ ] ,
" new_room_id " : None ,
}
def asdict ( self ) - > JsonDict :
ret = {
" status " : DeleteStatus . STATUS_TEXT [ self . status ] ,
" shutdown_room " : self . shutdown_room ,
}
if self . error :
ret [ " error " ] = self . error
return ret
SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME = " shutdown_and_purge_room "
class PaginationHandler :
@ -136,9 +71,6 @@ class PaginationHandler:
paginating during a purge .
"""
# when to remove a completed deletion/purge from the results map
CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24 # 24 hours
def __init__ ( self , hs : " HomeServer " ) :
self . hs = hs
self . auth = hs . get_auth ( )
@ -150,17 +82,11 @@ class PaginationHandler:
self . _room_shutdown_handler = hs . get_room_shutdown_handler ( )
self . _relations_handler = hs . get_relations_handler ( )
self . _worker_locks = hs . get_worker_locks_handler ( )
self . _task_scheduler = hs . get_task_scheduler ( )
self . pagination_lock = ReadWriteLock ( )
# IDs of rooms in which there currently an active purge *or delete* operation.
self . _purges_in_progress_by_room : Set [ str ] = set ( )
# map from purge id to PurgeStatus
self . _purges_by_id : Dict [ str , PurgeStatus ] = { }
# map from purge id to DeleteStatus
self . _delete_by_id : Dict [ str , DeleteStatus ] = { }
# map from room id to delete ids
# Dict[`room_id`, List[`delete_id`]]
self . _delete_by_room : Dict [ str , List [ str ] ] = { }
self . _event_serializer = hs . get_event_client_serializer ( )
self . _retention_default_max_lifetime = (
@ -173,6 +99,9 @@ class PaginationHandler:
self . _retention_allowed_lifetime_max = (
hs . config . retention . retention_allowed_lifetime_max
)
self . _forgotten_room_retention_period = (
hs . config . server . forgotten_room_retention_period
)
self . _is_master = hs . config . worker . worker_app is None
if hs . config . retention . retention_enabled and self . _is_master :
@ -189,6 +118,14 @@ class PaginationHandler:
job . longest_max_lifetime ,
)
self . _task_scheduler . register_action (
self . _purge_history , PURGE_HISTORY_ACTION_NAME
)
self . _task_scheduler . register_action ( self . _purge_room , PURGE_ROOM_ACTION_NAME )
self . _task_scheduler . register_action (
self . _shutdown_and_purge_room , SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME
)
async def purge_history_for_rooms_in_range (
self , min_ms : Optional [ int ] , max_ms : Optional [ int ]
) - > None :
@ -224,7 +161,7 @@ class PaginationHandler:
include_null = False
logger . info (
" [purge] Running purge job for %s < max_lifetime <= %s (include NULLs = %s ) " ,
" [purge] Running retention purge job for %s < max_lifetime <= %s (include NULLs = %s ) " ,
min_ms ,
max_ms ,
include_null ,
@ -239,10 +176,10 @@ class PaginationHandler:
for room_id , retention_policy in rooms . items ( ) :
logger . info ( " [purge] Attempting to purge messages in room %s " , room_id )
if room_id in self . _purges_in_progress_by_room :
if len ( await self . get_delete_tasks_by_room ( room_id , only_active = True ) ) > 0 :
logger . warning (
" [purge] not purging room %s as there ' s an ongoing purge running "
" for this room " ,
" [purge] not purging room %s for retention as there ' s an ongoing purge "
" running for this room " ,
room_id ,
)
continue
@ -295,27 +232,20 @@ class PaginationHandler:
( stream , topo , _event_id ) = r
token = " t %d - %d " % ( topo , stream )
purge_id = random_string ( 16 )
self . _purges_by_id [ purge_id ] = PurgeStatus ( )
logger . info (
" Starting purging events in room %s (purge_id %s ) " % ( room_id , purge_id )
)
logger . info ( " Starting purging events in room %s " , room_id )
# We want to purge everything, including local events, and to run the purge in
# the background so that it's not blocking any other operation apart from
# other purges in the same room.
run_as_background_process (
" _purge_history " ,
self . _purge_history ,
purge_id ,
PURGE_HISTORY_ACTION_NAME ,
self . purge_history ,
room_id ,
token ,
True ,
)
def start_purge_history (
async def start_purge_history (
self , room_id : str , token : str , delete_local_events : bool = False
) - > str :
""" Start off a history purge on a room.
@ -329,40 +259,58 @@ class PaginationHandler:
Returns :
unique ID for this purge transaction .
"""
if room_id in self . _purges_in_progress_by_room :
raise SynapseError (
400 , " History purge already in progress for %s " % ( room_id , )
)
purge_id = random_string ( 16 )
purge_id = await self . _task_scheduler . schedule_task (
PURGE_HISTORY_ACTION_NAME ,
resource_id = room_id ,
params = { " token " : token , " delete_local_events " : delete_local_events } ,
)
# we log the purge_id here so that it can be tied back to the
# request id in the log lines.
logger . info ( " [purge] starting purge_id %s " , purge_id )
self . _purges_by_id [ purge_id ] = PurgeStatus ( )
run_as_background_process (
" purge_history " ,
self . _purge_history ,
purge_id ,
room_id ,
token ,
delete_local_events ,
)
return purge_id
async def _purge_history (
self , purge_id : str , room_id : str , token : str , delete_local_events : bool
) - > None :
self ,
task : ScheduledTask ,
) - > Tuple [ TaskStatus , Optional [ JsonMapping ] , Optional [ str ] ] :
"""
Scheduler action to purge some history of a room .
"""
if (
task . resource_id is None
or task . params is None
or " token " not in task . params
or " delete_local_events " not in task . params
) :
return (
TaskStatus . FAILED ,
None ,
" Not enough parameters passed to _purge_history " ,
)
err = await self . purge_history (
task . resource_id ,
task . params [ " token " ] ,
task . params [ " delete_local_events " ] ,
)
if err is not None :
return TaskStatus . FAILED , None , err
return TaskStatus . COMPLETE , None , None
async def purge_history (
self ,
room_id : str ,
token : str ,
delete_local_events : bool ,
) - > Optional [ str ] :
""" Carry out a history purge on a room.
Args :
purge_id : The ID for this purge .
room_id : The room to purge from
token : topological token to delete events before
delete_local_events : True to delete local events as well as remote ones
"""
self . _purges_in_progress_by_room . add ( room_id )
try :
async with self . _worker_locks . acquire_read_write_lock (
PURGE_PAGINATION_LOCK_NAME , room_id , write = True
@ -371,57 +319,68 @@ class PaginationHandler:
room_id , token , delete_local_events
)
logger . info ( " [purge] complete " )
self . _purges_by_id [ purge_id ] . status = PurgeStatus . STATUS_COMPLETE
return None
except Exception :
f = Failure ( )
logger . error (
" [purge] failed " , exc_info = ( f . type , f . value , f . getTracebackObject ( ) )
)
self . _purges_by_id [ purge_id ] . status = PurgeStatus . STATUS_FAILED
self . _purges_by_id [ purge_id ] . error = f . getErrorMessage ( )
finally :
self . _purges_in_progress_by_room . discard ( room_id )
# remove the purge from the list 24 hours after it completes
def clear_purge ( ) - > None :
del self . _purges_by_id [ purge_id ]
self . hs . get_reactor ( ) . callLater (
PaginationHandler . CLEAR_PURGE_AFTER_MS / 1000 , clear_purge
)
def get_purge_status ( self , purge_id : str ) - > Optional [ PurgeStatus ] :
""" Get the current status of an active purge
return f . getErrorMessage ( )
Args :
purge_id : purge_id returned by start_purge_history
"""
return self . _purges_by_id . get ( purge_id )
def get_delete_status ( self , delete_id : str ) - > Optional [ DeleteStatus ] :
async def get_delete_task ( self , delete_id : str ) - > Optional [ ScheduledTask ] :
""" Get the current status of an active deleting
Args :
delete_id : delete_id returned by start_shutdown_and_purge_room
or start_purge_history .
"""
return self . _delete_by_id . get ( delete_id )
return await self . _task_scheduler . get_task ( delete_id )
def get_delete_ids_by_room ( self , room_id : str ) - > Optional [ StrCollection ] :
""" Get all active delete ids by room
async def get_delete_tasks_by_room (
self , room_id : str , only_active : Optional [ bool ] = False
) - > List [ ScheduledTask ] :
""" Get complete, failed or active delete tasks by room
Args :
room_id : room_id that is deleted
only_active : if True , completed & failed tasks will be omitted
"""
statuses = [ TaskStatus . ACTIVE ]
if not only_active :
statuses + = [ TaskStatus . COMPLETE , TaskStatus . FAILED ]
return await self . _task_scheduler . get_tasks (
actions = [ PURGE_ROOM_ACTION_NAME , SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME ] ,
resource_id = room_id ,
statuses = statuses ,
)
async def _purge_room (
self ,
task : ScheduledTask ,
) - > Tuple [ TaskStatus , Optional [ JsonMapping ] , Optional [ str ] ] :
"""
Scheduler action to purge a room .
"""
return self . _delete_by_room . get ( room_id )
if not task . resource_id :
raise Exception ( " No room id passed to purge_room task " )
params = task . params if task . params else { }
await self . purge_room ( task . resource_id , params . get ( " force " , False ) )
return TaskStatus . COMPLETE , None , None
async def purge_room ( self , room_id : str , force : bool = False ) - > None :
async def purge_room (
self ,
room_id : str ,
force : bool ,
) - > None :
""" Purge the given room from the database.
This function is part the delete room v1 API .
Args :
room_id : room to be purged
force : set true to skip checking for joined users .
"""
logger . info ( " starting purge room_id= %s force= %s " , room_id , force )
async with self . _worker_locks . acquire_multi_read_write_lock (
[
( PURGE_PAGINATION_LOCK_NAME , room_id ) ,
@ -430,13 +389,20 @@ class PaginationHandler:
write = True ,
) :
# first check that we have no users in this room
if not force :
joined = await self . store . is_host_joined ( room_id , self . _server_name )
if joined :
joined = await self . store . is_host_joined ( room_id , self . _server_name )
if joined :
if force :
logger . info (
" force-purging room %s with some local users still joined " ,
room_id ,
)
else :
raise SynapseError ( 400 , " Users are still joined to this room " )
await self . _storage_controllers . purge_events . purge_room ( room_id )
logger . info ( " purge complete for room_id %s " , room_id )
@trace
async def get_messages (
self ,
@ -711,177 +677,72 @@ class PaginationHandler:
async def _shutdown_and_purge_room (
self ,
delete_id : str ,
room_id : str ,
requester_user_id : Optional [ str ] ,
new_room_user_id : Optional [ str ] = None ,
new_room_name : Optional [ str ] = None ,
message : Optional [ str ] = None ,
block : bool = False ,
purge : bool = True ,
force_purge : bool = False ,
) - > None :
task : ScheduledTask ,
) - > Tuple [ TaskStatus , Optional [ JsonMapping ] , Optional [ str ] ] :
"""
Shuts down and purges a room .
See ` RoomShutdownHandler . shutdown_room ` for details of creation of the new room
Args :
delete_id : The ID for this delete .
room_id : The ID of the room to shut down .
requester_user_id :
User who requested the action . Will be recorded as putting the room on the
blocking list .
If None , the action was not manually requested but instead
triggered automatically , e . g . through a Synapse module
or some other policy .
MUST NOT be None if block = True .
new_room_user_id :
If set , a new room will be created with this user ID
as the creator and admin , and all users in the old room will be
moved into that room . If not set , no new room will be created
and the users will just be removed from the old room .
new_room_name :
A string representing the name of the room that new users will
be invited to . Defaults to ` Content Violation Notification `
message :
A string containing the first message that will be sent as
` new_room_user_id ` in the new room . Ideally this will clearly
convey why the original room was shut down .
Defaults to ` Sharing illegal content on this server is not
permitted and rooms in violation will be blocked . `
block :
If set to ` true ` , this room will be added to a blocking list ,
preventing future attempts to join the room . Defaults to ` false ` .
purge :
If set to ` true ` , purge the given room from the database .
force_purge :
If set to ` true ` , the room will be purged from database
also if it fails to remove some users from room .
Saves a ` RoomShutdownHandler . ShutdownRoomResponse ` in ` DeleteStatus ` :
Scheduler action to shutdown and purge a room .
"""
if task . resource_id is None or task . params is None :
raise Exception (
" No room id and/or no parameters passed to shutdown_and_purge_room task "
)
self . _purges_in_progress_by_room . add ( room_id )
try :
async with self . _worker_locks . acquire_read_write_lock (
PURGE_PAGINATION_LOCK_NAME , room_id , write = True
) :
self . _delete_by_id [ delete_id ] . status = DeleteStatus . STATUS_SHUTTING_DOWN
self . _delete_by_id [
delete_id
] . shutdown_room = await self . _room_shutdown_handler . shutdown_room (
room_id = room_id ,
requester_user_id = requester_user_id ,
new_room_user_id = new_room_user_id ,
new_room_name = new_room_name ,
message = message ,
block = block ,
)
self . _delete_by_id [ delete_id ] . status = DeleteStatus . STATUS_PURGING
room_id = task . resource_id
if purg e :
logger . info ( " starting purge room_id %s " , room_id )
async def update_result ( result : Optional [ JsonMapping ] ) - > None :
await self . _task_scheduler . update_task ( task . id , result = result )
# first check that we have no users in this room
if not force_purge :
joined = await self . store . is_host_joined (
room_id , self . _server_name
)
if joined :
raise SynapseError (
400 , " Users are still joined to this room "
)
shutdown_result = (
cast ( ShutdownRoomResponse , task . result ) if task . result else None
)
await self . _storage_controllers . purge_events . purge_room ( room_id )
shutdown_result = await self . _room_shutdown_handler . shutdown_room (
room_id ,
cast ( ShutdownRoomParams , task . params ) ,
shutdown_result ,
update_result ,
)
logger . info ( " purge complete for room_id %s " , room_id )
self . _delete_by_id [ delete_id ] . status = DeleteStatus . STATUS_COMPLETE
except Exception :
f = Failure ( )
logger . error (
" failed " ,
exc_info = ( f . type , f . value , f . getTracebackObject ( ) ) ,
)
self . _delete_by_id [ delete_id ] . status = DeleteStatus . STATUS_FAILED
self . _delete_by_id [ delete_id ] . error = f . getErrorMessage ( )
finally :
self . _purges_in_progress_by_room . discard ( room_id )
# remove the delete from the list 24 hours after it completes
def clear_delete ( ) - > None :
del self . _delete_by_id [ delete_id ]
self . _delete_by_room [ room_id ] . remove ( delete_id )
if not self . _delete_by_room [ room_id ] :
del self . _delete_by_room [ room_id ]
self . hs . get_reactor ( ) . callLater (
PaginationHandler . CLEAR_PURGE_AFTER_MS / 1000 , clear_delete
if task . params . get ( " purge " , False ) :
await self . purge_room (
room_id ,
task . params . get ( " force_purge " , False ) ,
)
def start_shutdown_and_purge_room (
return ( TaskStatus . COMPLETE , shutdown_result , None )
async def start_shutdown_and_purge_room (
self ,
room_id : str ,
requester_user_id : Optional [ str ] ,
new_room_user_id : Optional [ str ] = None ,
new_room_name : Optional [ str ] = None ,
message : Optional [ str ] = None ,
block : bool = False ,
purge : bool = True ,
force_purge : bool = False ,
shutdown_params : ShutdownRoomParams ,
) - > str :
""" Start off shut down and purge on a room.
Args :
room_id : The ID of the room to shut down .
requester_user_id :
User who requested the action and put the room on the
blocking list .
If None , the action was not manually requested but instead
triggered automatically , e . g . through a Synapse module
or some other policy .
MUST NOT be None if block = True .
new_room_user_id :
If set , a new room will be created with this user ID
as the creator and admin , and all users in the old room will be
moved into that room . If not set , no new room will be created
and the users will just be removed from the old room .
new_room_name :
A string representing the name of the room that new users will
be invited to . Defaults to ` Content Violation Notification `
message :
A string containing the first message that will be sent as
` new_room_user_id ` in the new room . Ideally this will clearly
convey why the original room was shut down .
Defaults to ` Sharing illegal content on this server is not
permitted and rooms in violation will be blocked . `
block :
If set to ` true ` , this room will be added to a blocking list ,
preventing future attempts to join the room . Defaults to ` false ` .
purge :
If set to ` true ` , purge the given room from the database .
force_purge :
If set to ` true ` , the room will be purged from database
also if it fails to remove some users from room .
shutdown_params : parameters for the shutdown
Returns :
unique ID for this delete transaction .
"""
if room_id in self . _purges_in_progress_by_room :
raise SynapseError (
400 , " History purge already in progress for %s " % ( room_id , )
)
if len ( await self . get_delete_tasks_by_room ( room_id , only_active = True ) ) > 0 :
raise SynapseError ( 400 , " Purge already in progress for %s " % ( room_id , ) )
# This check is double to `RoomShutdownHandler.shutdown_room`
# But here the requester get a direct response / error with HTTP request
# and do not have to check the purge status
new_room_user_id = shutdown_params [ " new_room_user_id " ]
if new_room_user_id is not None :
if not self . hs . is_mine_id ( new_room_user_id ) :
raise SynapseError (
400 , " User must be our own: %s " % ( new_room_user_id , )
)
delete_id = random_string ( 16 )
delete_id = await self . _task_scheduler . schedule_task (
SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME ,
resource_id = room_id ,
params = shutdown_params ,
)
# we log the delete_id here so that it can be tied back to the
# request id in the log lines.
@ -891,19 +752,4 @@ class PaginationHandler:
delete_id ,
)
self . _delete_by_id [ delete_id ] = DeleteStatus ( )
self . _delete_by_room . setdefault ( room_id , [ ] ) . append ( delete_id )
run_as_background_process (
" shutdown_and_purge_room " ,
self . _shutdown_and_purge_room ,
delete_id ,
room_id ,
requester_user_id ,
new_room_user_id ,
new_room_name ,
message ,
block ,
purge ,
force_purge ,
)
return delete_id