Properly clean a Nextcloud user with S3 storage
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
clean_s3_user/clean_s3_user.py

445 lines
18 KiB

from contextlib import closing
from logging import getLogger, StreamHandler, FileHandler, Formatter, INFO, DEBUG
from argparse import ArgumentParser
from subprocess import check_output
from json import loads
from pathlib import Path
from csv import writer, reader
from typing import Union
from botocore.config import Config
from mariadb import connect, connection
from boto3 import resource
class NextcloudUserNotFound(Exception):
def __init__(self, message: str = None):
super().__init__(message)
self.message = message
class CleanS3User:
occ_php_opts: [str] = []
occ_opts = ["--no-ansi", "--no-interaction", "--no-warnings"]
nxt_s3_key_prefix = "urn:oid:"
s3_delete_chunk = 1000
def __init__(self, nxc_path: Path = Path("/var/www/nextcloud"), log_level: int = INFO, log_path: Path = None):
"""
Init CleanS3User, fetching Nextcloud configuration and init database and S3 connection
:param nxc_path: Path to Nextcloud
:param log_level: The level for the logger
:param log_path: The log output path
"""
self.logger = self.init_logger(log_level, log_path)
self.nxc_path = nxc_path
self.nxc_conf = self.get_nxc_conf()
self.db = self.connect_db()
self.s3 = self.connect_s3()
def __del__(self):
"""
Close database connection and rollback any non-committed changes
"""
if getattr(self, "db", None) and self.db and self.db.ping():
self.db.rollback()
self.db.close()
@staticmethod
def init_logger(log_level: int = INFO, log_path: Path = None):
"""
Setup logger
:param log_level: The level for the logger
:param log_path: The log output path
"""
logger = getLogger("clean_s3_user")
ch = StreamHandler()
ch.setFormatter(
Formatter("%(message)s"))
logger.addHandler(ch)
if log_path:
fh = FileHandler(log_path)
fh.setFormatter(Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
logger.addHandler(fh)
logger.setLevel(log_level)
return logger
def occ(self, args: [str], json: bool = True) -> Union[dict, str]:
"""
Send a Nextcloud OCC command
:param args: The command args
:param json: Parse command output as JSON
:return: Dictionary if JSON or string
"""
args = ["sudo", "-u", "www-data", "php", *self.occ_php_opts, str(self.nxc_path / "occ"), *self.occ_opts, *args]
self.logger.debug("⚙ Executing occ command: " + " ".join(args))
out = check_output(args)
if json:
return loads(out)
else:
return out
def get_nxc_conf(self) -> dict:
"""
Get the Nextcloud configuration
:return: Nextcloud system configuration
"""
try:
config: dict = self.occ(["config:list", "--private"])["system"]
if not config:
raise ValueError("Nextcloud system configuration empty")
return config
except Exception as e:
self.logger.error("🛑 Fail to get Nextcloud configuration")
self.logger.debug("Execution info:", exc_info=True)
raise e
def connect_db(self) -> connection:
"""
Establish a connection to the database
:raises:
mariadb.Error: When an error occur on the first connection with teh database
:return: The MariaDB connection
"""
conn = {}
if not {"dbtype", "dbhost", "dbport", "dbuser", "dbpassword", "dbname", "dbtableprefix"}\
.issubset(self.nxc_conf.keys()):
self.logger.error("🛑 Missing Nextcloud database configuration")
raise ValueError("Missing Nextcloud database configuration")
elif self.nxc_conf["dbtype"] != "mysql":
self.logger.error("🛑 Non MySQL database are not supported yet")
raise ValueError("Non MySQL database are not supported")
host = self.nxc_conf["dbhost"].split(":")
if len(host) > 1:
conn["host"] = host[0]
port = host[1]
try:
conn["port"] = int(port)
except ValueError:
conn["unix_socket"] = port
else:
conn["host"] = self.nxc_conf["dbhost"]
conn["port"] = int(self.nxc_conf["dbport"])
conn["user"] = self.nxc_conf["dbuser"]
conn["password"] = self.nxc_conf["dbpassword"]
conn["database"] = self.nxc_conf["dbname"]
conn["autocommit"] = False
self.logger.info("🗄 Connecting to database")
self.logger.debug(conn)
try:
conn = connect(**conn)
except Exception as e:
self.logger.error("🛑 Fail to connect to Nextcloud database")
self.logger.debug("Execution info:", exc_info=True)
raise e
return conn
def connect_s3(self):
"""
Establish a connection to the S3 bucket
:return: The S3 Bucket
"""
if "objectstore" not in self.nxc_conf or "arguments" not in self.nxc_conf["objectstore"]:
self.logger.error("🛑 Missing Nextcloud S3 configuration")
raise ValueError("Missing Nextcloud S3 configuration")
config: dict = self.nxc_conf["objectstore"]["arguments"]
if not {"region", "use_ssl", "hostname", "port", "key", "secret"}.issubset(config.keys()):
self.logger.error("🛑 Missing Nextcloud S3 configuration arguments")
raise ValueError("Missing Nextcloud S3 configuration arguments")
self.logger.info("🗂 Connection to S3")
self.logger.debug(config)
try:
c = resource(service_name="s3", region_name=config["region"], use_ssl=config["use_ssl"],
endpoint_url=f"{'https' if config['use_ssl'] else 'http'}://{config['hostname']}:{config['port']}",
aws_access_key_id=config["key"], aws_secret_access_key=config["secret"],
config=Config(signature_version="s3v4"))
except Exception as e:
self.logger.error("🛑 Fail to connect to Nextcloud S3")
self.logger.debug("Execution info:", exc_info=True)
raise e
try:
b = c.Bucket(config["bucket"])
except Exception as e:
self.logger.error("🛑 Fail to get Nextcloud S3 bucket")
self.logger.debug("Execution info:", exc_info=True)
raise e
return b
def find_user(self, cursor, email: str) -> str:
"""
Find Nextcloud user by email
:param cursor: The database cursor
:param email: The user email
:return: The user Nextcloud UID
"""
email = self.db.escape_string(email)
cursor.execute(f"SELECT uid FROM {self.nxc_conf['dbtableprefix']}accounts WHERE data LIKE ?",
('%"email":{"value":"'+email+'","%',))
res = cursor.next()
return res[0] if res else None
def all_users(self, cursor, exclude: [str] = None) -> [str]:
"""
Return all Nextcloud users UID
:param cursor: The database cursor
:param exclude: List of user UID to exclude
:return: UID list of all Nextcloud user
"""
if exclude is None:
exclude = []
cursor.execute(f"SELECT uid FROM {self.nxc_conf['dbtableprefix']}accounts")
return list(filter(lambda u: u not in exclude, map(lambda u: u[0], cursor)))
def transfer_data(self, cursor, user: str, dest_user: str):
"""
Transfer user files and shares to another
:param cursor: The database cursor
:param user: The source user UID
:param dest_user: The destination user UID
"""
# Workaround Nextcloud incomplete S3 integration: https://github.com/nextcloud/server/pull/32781
self.logger.info(f"🩹 Saving {user}'s shares")
cursor.execute("CREATE OR REPLACE TEMPORARY TABLE save_shares AS "
# Create or replace a temporary table to store transferring shares information
"SELECT sh.id, CONCAT('files/transferred from ', CONCAT(REPLACE(sh.uid_owner, '%', '\%'), ' on%/', REPLACE(SUBSTR(fc.path, 7), '%', '\%'))) as new_path "
# Store share id and determine the new path of the share with % escape to avoid interfering LIKE
f"FROM {self.nxc_conf['dbtableprefix']}share sh "
# Get all share
f"INNER JOIN {self.nxc_conf['dbtableprefix']}filecache fc ON fc.fileid = sh.file_source "
# Join the filecache of each share to get the current path
"WHERE sh.uid_owner = ?",
# Filter share owner for source user
(user,))
self.logger.info(f" Transferring {user}'s data to {dest_user}")
self.occ(["files:transfer-ownership", user, dest_user], json=False)
# Continuation of the workaround
self.logger.info(f"🩹 Transferring {user}'s shares to {dest_user}")
self.db.commit() # Get database modification from the ownership transfer
cursor.execute(f"UPDATE {self.nxc_conf['dbtableprefix']}share sh "
# Get all shares
"INNER JOIN save_shares save ON save.id = sh.id "
# Join to have only saved shares in transfer
f"INNER JOIN {self.nxc_conf['dbtableprefix']}filecache fc ON fc.path LIKE save.new_path "
# Get filecache of each share from new path
f"INNER JOIN {self.nxc_conf['dbtableprefix']}storages s ON s.numeric_id = fc.storage AND s.id = CONCAT('object::user:', ?) "
# Get storage of filecache to filter only destination user files
"SET sh.uid_owner = SUBSTR(s.id, 14), sh.uid_initiator = SUBSTR(s.id, 14), "
"sh.item_source = fc.fileid, sh.file_source = fc.fileid "
# Update uid_owner and uid_initiator to dest user, item_source and file_source to new file id
"WHERE sh.share_with IS NULL OR sh.share_with != ?",
# Avoid self sharing
(dest_user, dest_user))
self.logger.info(f"🩹 Removing {user} shares to {dest_user}")
cursor.execute("DELETE sh.* "
# Rome share
f"FROM {self.nxc_conf['dbtableprefix']}share sh "
# Get all shares
"INNER JOIN save_shares save ON save.id = sh.id "
# Join to have only saved shares in transfer
"WHERE sh.share_with = ?",
# Get only self sharing
(dest_user, ))
self.db.commit()
def get_user_cache(self, cursor, user: str):
"""
Get the user cache
:param cursor: The database cursor
:param user: The user UID
:return: The database cursor with an executed statement
"""
cursor.execute("SELECT fc.* "
f"FROM {self.nxc_conf['dbtableprefix']}filecache fc "
f"INNER JOIN {self.nxc_conf['dbtableprefix']}storages s ON s.numeric_id = fc.storage "
"WHERE s.id = ?",
(f"object::user:{user}",))
return cursor
def save_user_cache(self, cursor, user: str, bck_file: Path):
"""
Save the user cache to a CSV file
:param cursor: The database cursor
:param user: The user UID
:param bck_file: The backup file path
"""
self.logger.info("💾 Saving user cache")
with open(bck_file, "w") as file:
w = writer(file)
w.writerow(map(lambda c: c[0], cursor.description))
for data in self.get_user_cache(cursor, user):
w.writerow(data)
@staticmethod
def restore_user_cache(bck_file: Path) -> [[str]]:
"""
Rester user cache from a CSV file
:param bck_file: The backup file path
:return: The CSV file content
"""
with open(bck_file, "r") as file:
return list(reader(file))
def get_user_key(self, cursor, user: str) -> [str]:
"""
Get the user cache formatted as S3 keys
:param cursor: The database cursor
:param user: The user UID
:return: List of S3 keys
"""
cursor.execute(f"SELECT CONCAT('{self.nxt_s3_key_prefix}', fc.fileid) "
f"FROM {self.nxc_conf['dbtableprefix']}filecache fc "
f"INNER JOIN {self.nxc_conf['dbtableprefix']}storages s ON s.numeric_id = fc.storage "
"WHERE s.id = ?",
(f"object::user:{user}",))
return list(map(lambda k: k[0], cursor))
def restore_user_keys(self, bck_file: Path) -> [str]:
return list(map(lambda c: self.nxt_s3_key_prefix + c[0], self.restore_user_cache(bck_file)[1:]))
def clean_s3(self, keys: [str], user):
"""
Clean a Nextcloud user cache from the S3 storage
:param keys: List of S3 keys to remove
:param user: The user UID (for log only)
"""
self.logger.info(f"♻ Deleting S3 {user}'s keys")
for chunk in (keys[pos:pos + self.s3_delete_chunk] for pos in range(0, len(keys), self.s3_delete_chunk)):
res = self.s3.delete_objects(
Delete={
"Objects": [
{
"Key": key
} for key in chunk
]
}
)
self.logger.debug(res)
if "Errors" in res and res["Errors"]:
self.logger.error("🛑 Error on S3 key deletion")
self.logger.debug(res["Errors"], exc_info=True)
raise ValueError("Error on S3 key deletion")
def delete_user(self, user: str):
"""
Remove a Nextcloud user
:param user: The user UID
"""
self.logger.info(f"🗑 Deleting user {user}")
self.occ(["user:delete", user], json=False)
def clean_user(self, user_email: str, dest_user_email: str = None, bck_file: Path = None):
"""
Delete properly a Nextcloud S3 user
:param user_email: User email
:param dest_user_email: Dest user email for transfer
:param bck_file: Path to backup fle for CSV export
"""
with closing(self.db.cursor()) as cursor:
user = self.find_user(cursor, user_email)
if not user:
raise NextcloudUserNotFound(f"User {user_email} not found")
if dest_user_email:
dest_user = self.find_user(cursor, dest_user_email)
if not dest_user:
raise NextcloudUserNotFound(f"Transfer destination user {dest_user} not found")
self.transfer_data(cursor, user, dest_user)
else:
if bck_file:
self.save_user_cache(cursor, user, bck_file)
else:
self.clean_s3(self.get_user_key(cursor, user), user)
self.delete_user(user)
def clean_all(self, exclude: [str] = None):
"""
Delete properly all Nextcloud user
:param exclude: List of user UID to exclude
"""
with closing(self.db.cursor()) as cursor:
for user in self.all_users(cursor, exclude):
self.clean_s3(self.get_user_key(cursor, user), user)
self.delete_user(user)
def clean_s3_csv(self, bck_file: Path):
"""
Remove S3 keys from CSV backup file
:param bck_file: The backup file path
"""
self.clean_s3(self.restore_user_keys(bck_file), "Unknown")
def main():
parser = ArgumentParser(description="Clean S3 user delete")
parser.add_argument("-p", "--path", metavar="/var/www/nextcloud", type=Path, help="The Nextcloud path",
default=Path("/var/www/nextcloud"))
parser.add_argument("-u", "--user", metavar="email", type=str, help="User email", default=None)
parser.add_argument("-d", "--dest-user", metavar="email", type=str, help="Destination user email", default=None)
parser.add_argument("-b", "--backup-file", metavar="/tmp/oc_filecache.sql", type=Path,
help="Path to user backup file", default=None)
parser.add_argument("--all", action="store_true", help="Clean all user")
parser.add_argument("-e", "--exclude", action="append", help="User to exclude during mass clean")
parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug output")
parser.add_argument("-l", "--log-path", type=Path, help="Path to logfile", default=None)
args = parser.parse_args()
if not args.path.exists():
parser.error("Nextcloud folder doesn't exists")
elif args.backup_file and not args.backup_file.parent.exists():
parser.error("Backup path folder doesn't exist")
elif args.log_path and not args.log_path.parent.exists():
parser.error("Log path folder doesn't exist")
cof = None
# noinspection PyBroadException
# Exceptions are already logged in the class
try:
cof = CleanS3User(nxc_path=args.path, log_level=DEBUG if args.verbose else INFO, log_path=args.log_path)
except:
exit(1)
# noinspection PyBroadException
# Exceptions are already logged in the class
try:
if args.user:
cof.clean_user(user_email=args.user, dest_user_email=args.dest_user, bck_file=args.backup_file)
elif args.all:
cof.clean_all(exclude=args.exclude)
elif args.backup_file:
cof.clean_s3_csv(bck_file=args.backup_file)
else:
parser.error("You need to specify at least a user email or a CSV to clean")
except NextcloudUserNotFound as n:
parser.error(n.message)
except:
exit(1)
if __name__ == '__main__':
main()