|
|
|
@ -13,7 +13,7 @@ |
|
|
|
|
# See the License for the specific language governing permissions and |
|
|
|
|
# limitations under the License. |
|
|
|
|
import logging |
|
|
|
|
from typing import TYPE_CHECKING, Any, Dict, Optional, Set |
|
|
|
|
from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional, Set |
|
|
|
|
|
|
|
|
|
import attr |
|
|
|
|
|
|
|
|
@ -22,7 +22,7 @@ from twisted.python.failure import Failure |
|
|
|
|
from synapse.api.constants import EventTypes, Membership |
|
|
|
|
from synapse.api.errors import SynapseError |
|
|
|
|
from synapse.api.filtering import Filter |
|
|
|
|
from synapse.logging.context import run_in_background |
|
|
|
|
from synapse.handlers.room import ShutdownRoomResponse |
|
|
|
|
from synapse.metrics.background_process_metrics import run_as_background_process |
|
|
|
|
from synapse.storage.state import StateFilter |
|
|
|
|
from synapse.streams.config import PaginationConfig |
|
|
|
@ -56,11 +56,62 @@ class PurgeStatus: |
|
|
|
|
STATUS_FAILED: "failed", |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
# Save the error message if an error occurs |
|
|
|
|
error: str = "" |
|
|
|
|
|
|
|
|
|
# Tracks whether this request has completed. One of STATUS_{ACTIVE,COMPLETE,FAILED}. |
|
|
|
|
status: int = STATUS_ACTIVE |
|
|
|
|
|
|
|
|
|
def asdict(self) -> JsonDict: |
|
|
|
|
return {"status": PurgeStatus.STATUS_TEXT[self.status]} |
|
|
|
|
ret = {"status": PurgeStatus.STATUS_TEXT[self.status]} |
|
|
|
|
if self.error: |
|
|
|
|
ret["error"] = self.error |
|
|
|
|
return ret |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@attr.s(slots=True, auto_attribs=True) |
|
|
|
|
class DeleteStatus: |
|
|
|
|
"""Object tracking the status of a delete room request |
|
|
|
|
|
|
|
|
|
This class contains information on the progress of a delete room request, for |
|
|
|
|
return by get_delete_status. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
STATUS_PURGING = 0 |
|
|
|
|
STATUS_COMPLETE = 1 |
|
|
|
|
STATUS_FAILED = 2 |
|
|
|
|
STATUS_SHUTTING_DOWN = 3 |
|
|
|
|
|
|
|
|
|
STATUS_TEXT = { |
|
|
|
|
STATUS_PURGING: "purging", |
|
|
|
|
STATUS_COMPLETE: "complete", |
|
|
|
|
STATUS_FAILED: "failed", |
|
|
|
|
STATUS_SHUTTING_DOWN: "shutting_down", |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
# Tracks whether this request has completed. |
|
|
|
|
# One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN}. |
|
|
|
|
status: int = STATUS_PURGING |
|
|
|
|
|
|
|
|
|
# Save the error message if an error occurs |
|
|
|
|
error: str = "" |
|
|
|
|
|
|
|
|
|
# Saves the result of an action to give it back to REST API |
|
|
|
|
shutdown_room: ShutdownRoomResponse = { |
|
|
|
|
"kicked_users": [], |
|
|
|
|
"failed_to_kick_users": [], |
|
|
|
|
"local_aliases": [], |
|
|
|
|
"new_room_id": None, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
def asdict(self) -> JsonDict: |
|
|
|
|
ret = { |
|
|
|
|
"status": DeleteStatus.STATUS_TEXT[self.status], |
|
|
|
|
"shutdown_room": self.shutdown_room, |
|
|
|
|
} |
|
|
|
|
if self.error: |
|
|
|
|
ret["error"] = self.error |
|
|
|
|
return ret |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PaginationHandler: |
|
|
|
@ -70,6 +121,9 @@ class PaginationHandler: |
|
|
|
|
paginating during a purge. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
# when to remove a completed deletion/purge from the results map |
|
|
|
|
CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24 # 24 hours |
|
|
|
|
|
|
|
|
|
def __init__(self, hs: "HomeServer"): |
|
|
|
|
self.hs = hs |
|
|
|
|
self.auth = hs.get_auth() |
|
|
|
@ -78,11 +132,18 @@ class PaginationHandler: |
|
|
|
|
self.state_store = self.storage.state |
|
|
|
|
self.clock = hs.get_clock() |
|
|
|
|
self._server_name = hs.hostname |
|
|
|
|
self._room_shutdown_handler = hs.get_room_shutdown_handler() |
|
|
|
|
|
|
|
|
|
self.pagination_lock = ReadWriteLock() |
|
|
|
|
# IDs of rooms in which there currently an active purge *or delete* operation. |
|
|
|
|
self._purges_in_progress_by_room: Set[str] = set() |
|
|
|
|
# map from purge id to PurgeStatus |
|
|
|
|
self._purges_by_id: Dict[str, PurgeStatus] = {} |
|
|
|
|
# map from purge id to DeleteStatus |
|
|
|
|
self._delete_by_id: Dict[str, DeleteStatus] = {} |
|
|
|
|
# map from room id to delete ids |
|
|
|
|
# Dict[`room_id`, List[`delete_id`]] |
|
|
|
|
self._delete_by_room: Dict[str, List[str]] = {} |
|
|
|
|
self._event_serializer = hs.get_event_client_serializer() |
|
|
|
|
|
|
|
|
|
self._retention_default_max_lifetime = ( |
|
|
|
@ -265,8 +326,13 @@ class PaginationHandler: |
|
|
|
|
logger.info("[purge] starting purge_id %s", purge_id) |
|
|
|
|
|
|
|
|
|
self._purges_by_id[purge_id] = PurgeStatus() |
|
|
|
|
run_in_background( |
|
|
|
|
self._purge_history, purge_id, room_id, token, delete_local_events |
|
|
|
|
run_as_background_process( |
|
|
|
|
"purge_history", |
|
|
|
|
self._purge_history, |
|
|
|
|
purge_id, |
|
|
|
|
room_id, |
|
|
|
|
token, |
|
|
|
|
delete_local_events, |
|
|
|
|
) |
|
|
|
|
return purge_id |
|
|
|
|
|
|
|
|
@ -276,7 +342,7 @@ class PaginationHandler: |
|
|
|
|
"""Carry out a history purge on a room. |
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
|
purge_id: The id for this purge |
|
|
|
|
purge_id: The ID for this purge. |
|
|
|
|
room_id: The room to purge from |
|
|
|
|
token: topological token to delete events before |
|
|
|
|
delete_local_events: True to delete local events as well as remote ones |
|
|
|
@ -295,6 +361,7 @@ class PaginationHandler: |
|
|
|
|
"[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject()) # type: ignore |
|
|
|
|
) |
|
|
|
|
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED |
|
|
|
|
self._purges_by_id[purge_id].error = f.getErrorMessage() |
|
|
|
|
finally: |
|
|
|
|
self._purges_in_progress_by_room.discard(room_id) |
|
|
|
|
|
|
|
|
@ -302,7 +369,9 @@ class PaginationHandler: |
|
|
|
|
def clear_purge() -> None: |
|
|
|
|
del self._purges_by_id[purge_id] |
|
|
|
|
|
|
|
|
|
self.hs.get_reactor().callLater(24 * 3600, clear_purge) |
|
|
|
|
self.hs.get_reactor().callLater( |
|
|
|
|
PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]: |
|
|
|
|
"""Get the current status of an active purge |
|
|
|
@ -312,8 +381,25 @@ class PaginationHandler: |
|
|
|
|
""" |
|
|
|
|
return self._purges_by_id.get(purge_id) |
|
|
|
|
|
|
|
|
|
def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]: |
|
|
|
|
"""Get the current status of an active deleting |
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
|
delete_id: delete_id returned by start_shutdown_and_purge_room |
|
|
|
|
""" |
|
|
|
|
return self._delete_by_id.get(delete_id) |
|
|
|
|
|
|
|
|
|
def get_delete_ids_by_room(self, room_id: str) -> Optional[Collection[str]]: |
|
|
|
|
"""Get all active delete ids by room |
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
|
room_id: room_id that is deleted |
|
|
|
|
""" |
|
|
|
|
return self._delete_by_room.get(room_id) |
|
|
|
|
|
|
|
|
|
async def purge_room(self, room_id: str, force: bool = False) -> None: |
|
|
|
|
"""Purge the given room from the database. |
|
|
|
|
This function is part the delete room v1 API. |
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
|
room_id: room to be purged |
|
|
|
@ -472,3 +558,192 @@ class PaginationHandler: |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
return chunk |
|
|
|
|
|
|
|
|
|
async def _shutdown_and_purge_room( |
|
|
|
|
self, |
|
|
|
|
delete_id: str, |
|
|
|
|
room_id: str, |
|
|
|
|
requester_user_id: str, |
|
|
|
|
new_room_user_id: Optional[str] = None, |
|
|
|
|
new_room_name: Optional[str] = None, |
|
|
|
|
message: Optional[str] = None, |
|
|
|
|
block: bool = False, |
|
|
|
|
purge: bool = True, |
|
|
|
|
force_purge: bool = False, |
|
|
|
|
) -> None: |
|
|
|
|
""" |
|
|
|
|
Shuts down and purges a room. |
|
|
|
|
|
|
|
|
|
See `RoomShutdownHandler.shutdown_room` for details of creation of the new room |
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
|
delete_id: The ID for this delete. |
|
|
|
|
room_id: The ID of the room to shut down. |
|
|
|
|
requester_user_id: |
|
|
|
|
User who requested the action. Will be recorded as putting the room on the |
|
|
|
|
blocking list. |
|
|
|
|
new_room_user_id: |
|
|
|
|
If set, a new room will be created with this user ID |
|
|
|
|
as the creator and admin, and all users in the old room will be |
|
|
|
|
moved into that room. If not set, no new room will be created |
|
|
|
|
and the users will just be removed from the old room. |
|
|
|
|
new_room_name: |
|
|
|
|
A string representing the name of the room that new users will |
|
|
|
|
be invited to. Defaults to `Content Violation Notification` |
|
|
|
|
message: |
|
|
|
|
A string containing the first message that will be sent as |
|
|
|
|
`new_room_user_id` in the new room. Ideally this will clearly |
|
|
|
|
convey why the original room was shut down. |
|
|
|
|
Defaults to `Sharing illegal content on this server is not |
|
|
|
|
permitted and rooms in violation will be blocked.` |
|
|
|
|
block: |
|
|
|
|
If set to `true`, this room will be added to a blocking list, |
|
|
|
|
preventing future attempts to join the room. Defaults to `false`. |
|
|
|
|
purge: |
|
|
|
|
If set to `true`, purge the given room from the database. |
|
|
|
|
force_purge: |
|
|
|
|
If set to `true`, the room will be purged from database |
|
|
|
|
also if it fails to remove some users from room. |
|
|
|
|
|
|
|
|
|
Saves a `RoomShutdownHandler.ShutdownRoomResponse` in `DeleteStatus`: |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
self._purges_in_progress_by_room.add(room_id) |
|
|
|
|
try: |
|
|
|
|
with await self.pagination_lock.write(room_id): |
|
|
|
|
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN |
|
|
|
|
self._delete_by_id[ |
|
|
|
|
delete_id |
|
|
|
|
].shutdown_room = await self._room_shutdown_handler.shutdown_room( |
|
|
|
|
room_id=room_id, |
|
|
|
|
requester_user_id=requester_user_id, |
|
|
|
|
new_room_user_id=new_room_user_id, |
|
|
|
|
new_room_name=new_room_name, |
|
|
|
|
message=message, |
|
|
|
|
block=block, |
|
|
|
|
) |
|
|
|
|
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING |
|
|
|
|
|
|
|
|
|
if purge: |
|
|
|
|
logger.info("starting purge room_id %s", room_id) |
|
|
|
|
|
|
|
|
|
# first check that we have no users in this room |
|
|
|
|
if not force_purge: |
|
|
|
|
joined = await self.store.is_host_joined( |
|
|
|
|
room_id, self._server_name |
|
|
|
|
) |
|
|
|
|
if joined: |
|
|
|
|
raise SynapseError( |
|
|
|
|
400, "Users are still joined to this room" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
await self.storage.purge_events.purge_room(room_id) |
|
|
|
|
|
|
|
|
|
logger.info("complete") |
|
|
|
|
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE |
|
|
|
|
except Exception: |
|
|
|
|
f = Failure() |
|
|
|
|
logger.error( |
|
|
|
|
"failed", |
|
|
|
|
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore |
|
|
|
|
) |
|
|
|
|
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED |
|
|
|
|
self._delete_by_id[delete_id].error = f.getErrorMessage() |
|
|
|
|
finally: |
|
|
|
|
self._purges_in_progress_by_room.discard(room_id) |
|
|
|
|
|
|
|
|
|
# remove the delete from the list 24 hours after it completes |
|
|
|
|
def clear_delete() -> None: |
|
|
|
|
del self._delete_by_id[delete_id] |
|
|
|
|
self._delete_by_room[room_id].remove(delete_id) |
|
|
|
|
if not self._delete_by_room[room_id]: |
|
|
|
|
del self._delete_by_room[room_id] |
|
|
|
|
|
|
|
|
|
self.hs.get_reactor().callLater( |
|
|
|
|
PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_delete |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def start_shutdown_and_purge_room( |
|
|
|
|
self, |
|
|
|
|
room_id: str, |
|
|
|
|
requester_user_id: str, |
|
|
|
|
new_room_user_id: Optional[str] = None, |
|
|
|
|
new_room_name: Optional[str] = None, |
|
|
|
|
message: Optional[str] = None, |
|
|
|
|
block: bool = False, |
|
|
|
|
purge: bool = True, |
|
|
|
|
force_purge: bool = False, |
|
|
|
|
) -> str: |
|
|
|
|
"""Start off shut down and purge on a room. |
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
|
room_id: The ID of the room to shut down. |
|
|
|
|
requester_user_id: |
|
|
|
|
User who requested the action and put the room on the |
|
|
|
|
blocking list. |
|
|
|
|
new_room_user_id: |
|
|
|
|
If set, a new room will be created with this user ID |
|
|
|
|
as the creator and admin, and all users in the old room will be |
|
|
|
|
moved into that room. If not set, no new room will be created |
|
|
|
|
and the users will just be removed from the old room. |
|
|
|
|
new_room_name: |
|
|
|
|
A string representing the name of the room that new users will |
|
|
|
|
be invited to. Defaults to `Content Violation Notification` |
|
|
|
|
message: |
|
|
|
|
A string containing the first message that will be sent as |
|
|
|
|
`new_room_user_id` in the new room. Ideally this will clearly |
|
|
|
|
convey why the original room was shut down. |
|
|
|
|
Defaults to `Sharing illegal content on this server is not |
|
|
|
|
permitted and rooms in violation will be blocked.` |
|
|
|
|
block: |
|
|
|
|
If set to `true`, this room will be added to a blocking list, |
|
|
|
|
preventing future attempts to join the room. Defaults to `false`. |
|
|
|
|
purge: |
|
|
|
|
If set to `true`, purge the given room from the database. |
|
|
|
|
force_purge: |
|
|
|
|
If set to `true`, the room will be purged from database |
|
|
|
|
also if it fails to remove some users from room. |
|
|
|
|
|
|
|
|
|
Returns: |
|
|
|
|
unique ID for this delete transaction. |
|
|
|
|
""" |
|
|
|
|
if room_id in self._purges_in_progress_by_room: |
|
|
|
|
raise SynapseError( |
|
|
|
|
400, "History purge already in progress for %s" % (room_id,) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# This check is double to `RoomShutdownHandler.shutdown_room` |
|
|
|
|
# But here the requester get a direct response / error with HTTP request |
|
|
|
|
# and do not have to check the purge status |
|
|
|
|
if new_room_user_id is not None: |
|
|
|
|
if not self.hs.is_mine_id(new_room_user_id): |
|
|
|
|
raise SynapseError( |
|
|
|
|
400, "User must be our own: %s" % (new_room_user_id,) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
delete_id = random_string(16) |
|
|
|
|
|
|
|
|
|
# we log the delete_id here so that it can be tied back to the |
|
|
|
|
# request id in the log lines. |
|
|
|
|
logger.info( |
|
|
|
|
"starting shutdown room_id %s with delete_id %s", |
|
|
|
|
room_id, |
|
|
|
|
delete_id, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
self._delete_by_id[delete_id] = DeleteStatus() |
|
|
|
|
self._delete_by_room.setdefault(room_id, []).append(delete_id) |
|
|
|
|
run_as_background_process( |
|
|
|
|
"shutdown_and_purge_room", |
|
|
|
|
self._shutdown_and_purge_room, |
|
|
|
|
delete_id, |
|
|
|
|
room_id, |
|
|
|
|
requester_user_id, |
|
|
|
|
new_room_user_id, |
|
|
|
|
new_room_name, |
|
|
|
|
message, |
|
|
|
|
block, |
|
|
|
|
purge, |
|
|
|
|
force_purge, |
|
|
|
|
) |
|
|
|
|
return delete_id |
|
|
|
|