Properly clean a Nextcloud user with S3 storage
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
clean_s3_user/clean_s3_user.py

391 lines
15 KiB

from contextlib import closing
from logging import getLogger, StreamHandler, FileHandler, Formatter, INFO, DEBUG
from argparse import ArgumentParser, ArgumentTypeError
from subprocess import check_output
from json import loads
from pathlib import Path
from csv import writer, reader
from typing import Union
from botocore.config import Config
from mariadb import connect, connection
from boto3 import resource
class CleanS3User:
occ_php_opts: [str] = []
occ_opts = ["--no-ansi", "--no-interaction", "--no-warnings"]
nxt_s3_key_prefix = "urn:oid:"
s3_delete_chunk = 1000
def __init__(self, nxc_path: Path = Path("/var/www/nextcloud"), log_level: int = INFO, log_path: Path = None):
"""
Init CleanS3User, fetching Nextcloud configuration and init database and S3 connection
:param nxc_path: Path to Nextcloud
:param log_level: The level for the logger
:param log_path: The log output path
"""
self.logger = self.init_logger(log_level, log_path)
try:
self.nxc_path = nxc_path
self.nxc_conf = self.get_nxc_conf()
self.db = self.connect_db()
self.s3 = self.connect_s3()
except Exception as e:
self.logger.exception(e)
raise e
def __del__(self):
"""
Close database connection and rollback any non committed changes
"""
if getattr(self, "db", None) and self.db and self.db.ping():
self.db.rollback()
self.db.close()
def init_logger(self, log_level: int = INFO, log_path: Path = None):
"""
Setup logger
:param log_level: The level for the logger
:param log_path: The log output path
"""
logger = getLogger("clean_s3_user")
ch = StreamHandler()
ch.setFormatter(
Formatter("%(message)s"))
logger.addHandler(ch)
if log_path:
fh = FileHandler(log_path)
fh.setFormatter(Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
logger.addHandler(fh)
logger.setLevel(log_level)
return logger
def occ(self, args: [str], json: bool = True) -> Union[dict, str]:
"""
Send a Nextcloud OCC command
:param args: The command args
:param json: Parse command output as JSON
:return: Dictionary if JSON or string
"""
args = ["sudo", "-u", "www-data", "php", *self.occ_php_opts, str(self.nxc_path / "occ"), *self.occ_opts, *args]
self.logger.debug("⚙ Executing occ command: " + " ".join(args))
out = check_output(args)
if json:
return loads(out)
else:
return out
def get_nxc_conf(self) -> dict:
"""
Get the Nextcloud configuration
:return: Nextcloud system configuration
"""
return self.occ(["config:list", "--private"])["system"]
def connect_db(self) -> connection:
"""
Establish a connection to the database
:raises:
mariadb.Error: When an error occur on the first connection with teh database
:return: The MariaDB connection
"""
conn = {}
host = self.nxc_conf["dbhost"].split(":")
if len(host) > 1:
conn["host"] = host[0]
port = host[1]
try:
conn["port"] = int(port)
except ValueError:
conn["unix_socket"] = port
else:
conn["host"] = self.nxc_conf["dbhost"]
conn["port"] = int(self.nxc_conf["dbport"])
conn["user"] = self.nxc_conf["dbuser"]
conn["password"] = self.nxc_conf["dbpassword"]
conn["database"] = self.nxc_conf["dbname"]
conn["autocommit"] = False
self.logger.info("🗄 Connecting to database")
self.logger.debug(conn)
return connect(**conn)
def connect_s3(self):
"""
Establish a connection to the S3 bucket
:return: The S3 Bucket
"""
config = self.nxc_conf["objectstore"]["arguments"]
self.logger.info("🗂 Connection to S3")
self.logger.debug(config)
c = resource(service_name="s3", region_name=config["region"], use_ssl=config['use_ssl'],
endpoint_url=f"{'https' if config['use_ssl'] else 'http'}://{config['hostname']}:{config['port']}",
aws_access_key_id=config["key"], aws_secret_access_key=config["secret"],
config=Config(signature_version="s3v4"))
return c.Bucket(config["bucket"])
def find_user(self, cursor, email: str) -> str:
"""
Find Nextcloud user by email
:param cursor: The database cursor
:param email: The user email
:return: The user Nextcloud UID
"""
email = self.db.escape_string(email)
cursor.execute('SELECT uid FROM oc_accounts WHERE data LIKE \'%"email":{"value":"'+email+'","%\'')
res = cursor.next()
return res[0] if res else None
def all_users(self, cursor, exclude: [str] = None) -> [str]:
"""
Return all Nextcloud users UID
:param cursor: The database cursor
:param exclude: List of user UID to exclude
:return: UID list of all Nextcloud user
"""
if exclude is None:
exclude = []
cursor.execute("SELECT uid FROM oc_accounts")
return list(filter(lambda u: u not in exclude, map(lambda u: u[0], cursor)))
def transfer_data(self, cursor, user: str, dest_user: str):
"""
Transfer user files and shares to another
:param cursor: The database cursor
:param user: The source user UID
:param dest_user: The destination user UID
"""
# Workaround Nextcloud incomplete S3 integration: https://github.com/nextcloud/server/pull/32781
self.logger.info(f"🩹 Saving {user}'s shares")
cursor.execute("CREATE OR REPLACE TEMPORARY TABLE save_shares AS "
# Create or replace a temporary table to store transferring shares information
"SELECT sh.id, CONCAT('files/transferred from ', CONCAT(REPLACE(sh.uid_owner, '%', '\%'), ' on%/', REPLACE(SUBSTR(fc.path, 7), '%', '\%'))) as new_path "
# Store share id and determine the new path of the share with % escape to avoid interfering LIKE
"FROM oc_share sh "
# Get all share
"INNER JOIN oc_filecache fc ON fc.fileid = sh.file_source "
# Join the filecache of each share to get the current path
"WHERE sh.uid_owner = ?",
# Filter share owner for source user
(user,))
self.logger.info(f" Transferring {user}'s data to {dest_user}")
self.occ(["files:transfer-ownership", user, dest_user], json=False)
# Continuation of the workaround
self.logger.info(f"🩹 Transferring {user}'s shares to {dest_user}")
self.db.commit() # Get database modification from the ownership transfer
cursor.execute("UPDATE oc_share sh "
# Get all shares
"INNER JOIN save_shares save ON save.id = sh.id "
# Join to have only saved shares in transfer
"INNER JOIN oc_filecache fc ON fc.path LIKE save.new_path "
# Get filecache of each share from new path
"INNER JOIN oc_storages s ON s.numeric_id = fc.storage AND s.id = CONCAT('object::user:', ?) "
# Get storage of filecache to filter only destination user files
"SET sh.uid_owner = SUBSTR(s.id, 14), sh.uid_initiator = SUBSTR(s.id, 14), "
"sh.item_source = fc.fileid, sh.file_source = fc.fileid "
# Update uid_owner and uid_initiator to dest user, item_source and file_source to new file id
"WHERE sh.share_with IS NULL OR sh.share_with != ?",
# Avoid self sharing
(dest_user, dest_user))
self.logger.info(f"🩹 Removing {user} shares to {dest_user}")
cursor.execute("DELETE sh.* "
# Rome share
"FROM oc_share sh "
# Get all shares
"INNER JOIN save_shares save ON save.id = sh.id "
# Join to have only saved shares in transfer
"WHERE sh.share_with = ?",
# Get only self sharing
(dest_user, ))
self.db.commit()
def get_user_cache(self, cursor, user: str):
"""
Get the user cache
:param cursor: The database cursor
:param user: The user UID
:return: The database cursor with an executed statement
"""
cursor.execute("SELECT fc.* "
"FROM oc_filecache fc "
"INNER JOIN oc_storages s ON s.numeric_id = fc.storage "
"WHERE s.id = ?",
(f"object::user:{user}",))
return cursor
def save_user_cache(self, cursor, user: str, bck_file: Path):
"""
Save the user cache to a CSV file
:param cursor: The database cursor
:param user: The user UID
:param bck_file: The backup file path
"""
self.logger.info("💾 Saving user cache")
with open(bck_file, "w") as file:
w = writer(file)
w.writerow(map(lambda c: c[0], cursor.description))
for data in self.get_user_cache(cursor, user):
w.writerow(data)
def restore_user_cache(self, bck_file: Path) -> [[str]]:
"""
Rester user cache from a CSV file
:param bck_file: The backup file path
:return: The CSV file content
"""
with open(bck_file, "r") as file:
return list(reader(file))
def get_user_key(self, cursor, user: str) -> [str]:
"""
Get the user cache formatted as S3 keys
:param cursor: The database cursor
:param user: The user UID
:return: List of S3 keys
"""
cursor.execute(f"SELECT CONCAT('{self.nxt_s3_key_prefix}', fc.fileid) "
"FROM oc_filecache fc "
"INNER JOIN oc_storages s ON s.numeric_id = fc.storage "
"WHERE s.id = ?",
(f"object::user:{user}",))
return list(map(lambda k: k[0], cursor))
def restore_user_keys(self, bck_file: Path) -> [str]:
return list(map(lambda c: self.nxt_s3_key_prefix + c[0], self.restore_user_cache(bck_file)[1:]))
def clean_s3(self, keys: [str], user):
"""
Clean a Nextcloud user cache from the S3 storage
:param keys: List of S3 keys to remove
:param user: The user UID (for log only)
"""
self.logger.info(f"♻ Deleting S3 {user}'s keys")
for chunk in (keys[pos:pos + self.s3_delete_chunk] for pos in range(0, len(keys), self.s3_delete_chunk)):
res = self.s3.delete_objects(
Delete={
"Objects": [
{
"Key": key
} for key in chunk
]
}
)
self.logger.debug(res)
if "Errors" in res and res["Errors"]:
raise ValueError(res["Errors"])
def delete_user(self, user: str):
"""
Remove a Nextcloud user
:param user: The user UID
"""
self.logger.info(f"🗑 Deleting user {user}")
self.occ(["user:delete", user], json=False)
def clean_user(self, user_email: str, dest_user_email: str = None, bck_file: Path = None):
"""
Delete properly a Nextcloud S3 user
:param user_email: User email
:param dest_user_email: Dest user email for transfer
:param bck_file: Path to backup fle for CSV export
"""
with closing(self.db.cursor()) as cursor:
user = self.find_user(cursor, user_email)
if not user:
raise ValueError("User not found")
if dest_user_email:
dest_user = self.find_user(cursor, dest_user_email)
if not dest_user:
raise ValueError("Transfer destination user not found")
self.transfer_data(cursor, user, dest_user)
else:
if bck_file:
self.save_user_cache(cursor, user, bck_file)
else:
self.clean_s3(self.get_user_key(cursor, user), user)
self.delete_user(user)
def clean_all(self, exclude: [str] = None):
"""
Delete properly all Nextcloud user
:param exclude: List of user UID to exclude
"""
with closing(self.db.cursor()) as cursor:
for user in self.all_users(cursor, exclude):
self.clean_s3(self.get_user_key(cursor, user), user)
self.delete_user(user)
def clean_s3_csv(self, bck_file: Path):
"""
Remove S3 keys from CSV backup file
:param bck_file: The backup file path
"""
self.clean_s3(self.restore_user_keys(bck_file), "Unknown")
def main():
parser = ArgumentParser(description="Clean S3 user delete")
parser.add_argument("-p", "--path", metavar="/var/www/nextcloud", type=Path, help="The Nextcloud path",
default=Path("/var/www/nextcloud"))
parser.add_argument("-u", "--user", metavar="email", type=str, help="User email", default=None)
parser.add_argument("-d", "--dest-user", metavar="email", type=str, help="Destination user email", default=None)
parser.add_argument("-b", "--backup-file", metavar="/tmp/oc_filecache.sql", type=Path,
help="Path to user backup file", default=None)
parser.add_argument("--all", action="store_true", help="Clean all user")
parser.add_argument("-e", "--exclude", action="append", help="User to exclude during mass clean")
parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug output")
parser.add_argument("-l", "--log-path", type=Path, help="Path to logfile", default=None)
args = parser.parse_args()
if not args.path.exists():
raise ArgumentTypeError("Nextcloud folder doesn't exists")
elif args.backup_file and not args.backup_file.parent.exists():
raise ArgumentTypeError("Backup path folder doesn't exist")
elif args.log_path and not args.log_path.parent.exists():
raise ArgumentTypeError("Log path folder doesn't exist")
cof = None
# noinspection PyBroadException
try:
cof = CleanS3User(nxc_path=args.path, log_level=DEBUG if args.verbose else INFO, log_path=args.log_path)
except:
exit(1)
try:
if args.user:
cof.clean_user(user_email=args.user, dest_user_email=args.dest_user, bck_file=args.backup_file)
elif args.all:
cof.clean_all(exclude=args.exclude)
elif args.backup_file:
cof.clean_s3_csv(bck_file=args.backup_file)
else:
raise ArgumentTypeError("You need to specify at least a user email or a CSV to clean")
except Exception as e:
cof.logger.exception(e)
exit(1)
if __name__ == '__main__':
main()