|
|
from contextlib import closing
|
|
|
from logging import getLogger, StreamHandler, FileHandler, Formatter, INFO, DEBUG
|
|
|
from argparse import ArgumentParser
|
|
|
from subprocess import check_output
|
|
|
from json import loads
|
|
|
from pathlib import Path
|
|
|
from csv import writer, reader
|
|
|
|
|
|
from typing import Union
|
|
|
|
|
|
from botocore.config import Config
|
|
|
from mariadb import connect, connection
|
|
|
from boto3 import resource
|
|
|
|
|
|
|
|
|
class NextcloudUserNotFound(Exception):
|
|
|
def __init__(self, message: str = None):
|
|
|
super().__init__(message)
|
|
|
self.message = message
|
|
|
|
|
|
|
|
|
class CleanS3User:
|
|
|
occ_php_opts: [str] = []
|
|
|
occ_opts = ["--no-ansi", "--no-interaction", "--no-warnings"]
|
|
|
nxt_s3_key_prefix = "urn:oid:"
|
|
|
s3_delete_chunk = 1000
|
|
|
|
|
|
def __init__(self, nxc_path: Path = Path("/var/www/nextcloud"), log_level: int = INFO, log_path: Path = None):
|
|
|
"""
|
|
|
Init CleanS3User, fetching Nextcloud configuration and init database and S3 connection
|
|
|
:param nxc_path: Path to Nextcloud
|
|
|
:param log_level: The level for the logger
|
|
|
:param log_path: The log output path
|
|
|
"""
|
|
|
self.logger = self.init_logger(log_level, log_path)
|
|
|
self.nxc_path = nxc_path
|
|
|
self.nxc_conf = self.get_nxc_conf()
|
|
|
self.db = self.connect_db()
|
|
|
self.s3 = self.connect_s3()
|
|
|
|
|
|
def __del__(self):
|
|
|
"""
|
|
|
Close database connection and rollback any non-committed changes
|
|
|
"""
|
|
|
if getattr(self, "db", None) and self.db and self.db.ping():
|
|
|
self.db.rollback()
|
|
|
self.db.close()
|
|
|
|
|
|
@staticmethod
|
|
|
def init_logger(log_level: int = INFO, log_path: Path = None):
|
|
|
"""
|
|
|
Setup logger
|
|
|
:param log_level: The level for the logger
|
|
|
:param log_path: The log output path
|
|
|
"""
|
|
|
logger = getLogger("clean_s3_user")
|
|
|
ch = StreamHandler()
|
|
|
ch.setFormatter(
|
|
|
Formatter("%(message)s"))
|
|
|
logger.addHandler(ch)
|
|
|
if log_path:
|
|
|
fh = FileHandler(log_path)
|
|
|
fh.setFormatter(Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
|
|
|
logger.addHandler(fh)
|
|
|
logger.setLevel(log_level)
|
|
|
return logger
|
|
|
|
|
|
def occ(self, args: [str], json: bool = True) -> Union[dict, str]:
|
|
|
"""
|
|
|
Send a Nextcloud OCC command
|
|
|
:param args: The command args
|
|
|
:param json: Parse command output as JSON
|
|
|
:return: Dictionary if JSON or string
|
|
|
"""
|
|
|
args = ["sudo", "-u", "www-data", "php", *self.occ_php_opts, str(self.nxc_path / "occ"), *self.occ_opts, *args]
|
|
|
self.logger.debug("⚙ Executing occ command: " + " ".join(args))
|
|
|
out = check_output(args)
|
|
|
|
|
|
if json:
|
|
|
return loads(out)
|
|
|
else:
|
|
|
return out
|
|
|
|
|
|
def get_nxc_conf(self) -> dict:
|
|
|
"""
|
|
|
Get the Nextcloud configuration
|
|
|
:return: Nextcloud system configuration
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
config: dict = self.occ(["config:list", "--private"])["system"]
|
|
|
if not config:
|
|
|
raise ValueError("Nextcloud system configuration empty")
|
|
|
return config
|
|
|
except Exception as e:
|
|
|
self.logger.error("🛑 Fail to get Nextcloud configuration")
|
|
|
self.logger.debug("Execution info:", exc_info=True)
|
|
|
raise e
|
|
|
|
|
|
def connect_db(self) -> connection:
|
|
|
"""
|
|
|
Establish a connection to the database
|
|
|
:raises:
|
|
|
mariadb.Error: When an error occur on the first connection with teh database
|
|
|
:return: The MariaDB connection
|
|
|
"""
|
|
|
conn = {}
|
|
|
|
|
|
if not {"dbtype", "dbhost", "dbport", "dbuser", "dbpassword", "dbname", "dbtableprefix"}\
|
|
|
.issubset(self.nxc_conf.keys()):
|
|
|
self.logger.error("🛑 Missing Nextcloud database configuration")
|
|
|
raise ValueError("Missing Nextcloud database configuration")
|
|
|
elif self.nxc_conf["dbtype"] != "mysql":
|
|
|
self.logger.error("🛑 Non MySQL database are not supported yet")
|
|
|
raise ValueError("Non MySQL database are not supported")
|
|
|
|
|
|
host = self.nxc_conf["dbhost"].split(":")
|
|
|
if len(host) > 1:
|
|
|
conn["host"] = host[0]
|
|
|
port = host[1]
|
|
|
|
|
|
try:
|
|
|
conn["port"] = int(port)
|
|
|
except ValueError:
|
|
|
conn["unix_socket"] = port
|
|
|
else:
|
|
|
conn["host"] = self.nxc_conf["dbhost"]
|
|
|
conn["port"] = int(self.nxc_conf["dbport"])
|
|
|
|
|
|
conn["user"] = self.nxc_conf["dbuser"]
|
|
|
conn["password"] = self.nxc_conf["dbpassword"]
|
|
|
conn["database"] = self.nxc_conf["dbname"]
|
|
|
conn["autocommit"] = False
|
|
|
|
|
|
self.logger.info("🗄️ Connecting to database")
|
|
|
self.logger.debug(conn)
|
|
|
try:
|
|
|
conn = connect(**conn)
|
|
|
except Exception as e:
|
|
|
self.logger.error("🛑 Fail to connect to Nextcloud database")
|
|
|
self.logger.debug("Execution info:", exc_info=True)
|
|
|
raise e
|
|
|
return conn
|
|
|
|
|
|
def connect_s3(self):
|
|
|
"""
|
|
|
Establish a connection to the S3 bucket
|
|
|
:return: The S3 Bucket
|
|
|
"""
|
|
|
if "objectstore" not in self.nxc_conf or "arguments" not in self.nxc_conf["objectstore"]:
|
|
|
self.logger.error("🛑 Missing Nextcloud S3 configuration")
|
|
|
raise ValueError("Missing Nextcloud S3 configuration")
|
|
|
|
|
|
config: dict = self.nxc_conf["objectstore"]["arguments"]
|
|
|
|
|
|
if not {"region", "use_ssl", "hostname", "port", "key", "secret"}.issubset(config.keys()):
|
|
|
self.logger.error("🛑 Missing Nextcloud S3 configuration arguments")
|
|
|
raise ValueError("Missing Nextcloud S3 configuration arguments")
|
|
|
|
|
|
self.logger.info("🗂️ Connection to S3")
|
|
|
self.logger.debug(config)
|
|
|
try:
|
|
|
c = resource(service_name="s3", region_name=config["region"], use_ssl=config["use_ssl"],
|
|
|
endpoint_url=f"{'https' if config['use_ssl'] else 'http'}://{config['hostname']}:{config['port']}",
|
|
|
aws_access_key_id=config["key"], aws_secret_access_key=config["secret"],
|
|
|
config=Config(signature_version="s3v4"))
|
|
|
except Exception as e:
|
|
|
self.logger.error("🛑 Fail to connect to Nextcloud S3")
|
|
|
self.logger.debug("Execution info:", exc_info=True)
|
|
|
raise e
|
|
|
|
|
|
try:
|
|
|
b = c.Bucket(config["bucket"])
|
|
|
except Exception as e:
|
|
|
self.logger.error("🛑 Fail to get Nextcloud S3 bucket")
|
|
|
self.logger.debug("Execution info:", exc_info=True)
|
|
|
raise e
|
|
|
|
|
|
return b
|
|
|
|
|
|
def find_user(self, cursor, email: str) -> str:
|
|
|
"""
|
|
|
Find Nextcloud user by email
|
|
|
:param cursor: The database cursor
|
|
|
:param email: The user email
|
|
|
:return: The user Nextcloud UID
|
|
|
"""
|
|
|
email = self.db.escape_string(email)
|
|
|
cursor.execute(f"SELECT uid FROM {self.nxc_conf['dbtableprefix']}accounts WHERE data LIKE ?",
|
|
|
('%"email":{"value":"'+email+'","%',))
|
|
|
res = cursor.next()
|
|
|
return res[0] if res else None
|
|
|
|
|
|
def all_users(self, cursor, exclude: [str] = None) -> [str]:
|
|
|
"""
|
|
|
Return all Nextcloud users UID
|
|
|
:param cursor: The database cursor
|
|
|
:param exclude: List of user UID to exclude
|
|
|
:return: UID list of all Nextcloud user
|
|
|
"""
|
|
|
if exclude is None:
|
|
|
exclude = []
|
|
|
|
|
|
cursor.execute(f"SELECT uid FROM {self.nxc_conf['dbtableprefix']}accounts")
|
|
|
return list(filter(lambda u: u not in exclude, map(lambda u: u[0], cursor)))
|
|
|
|
|
|
def transfer_data(self, cursor, user: str, dest_user: str):
|
|
|
"""
|
|
|
Transfer user files and shares to another
|
|
|
:param cursor: The database cursor
|
|
|
:param user: The source user UID
|
|
|
:param dest_user: The destination user UID
|
|
|
"""
|
|
|
|
|
|
# Workaround Nextcloud incomplete S3 integration: https://github.com/nextcloud/server/pull/32781
|
|
|
self.logger.info(f"🩹 Saving {user}'s shares")
|
|
|
cursor.execute("CREATE OR REPLACE TEMPORARY TABLE save_shares AS "
|
|
|
# Create or replace a temporary table to store transferring shares information
|
|
|
"SELECT sh.id, CONCAT('files/transferred from ', CONCAT(REPLACE(sh.uid_owner, '%', '\%'), ' on%/', REPLACE(SUBSTR(fc.path, 7), '%', '\%'))) as new_path "
|
|
|
# Store share id and determine the new path of the share with % escape to avoid interfering LIKE
|
|
|
f"FROM {self.nxc_conf['dbtableprefix']}share sh "
|
|
|
# Get all share
|
|
|
f"INNER JOIN {self.nxc_conf['dbtableprefix']}filecache fc ON fc.fileid = sh.file_source "
|
|
|
# Join the filecache of each share to get the current path
|
|
|
"WHERE sh.uid_owner = ?",
|
|
|
# Filter share owner for source user
|
|
|
(user,))
|
|
|
|
|
|
self.logger.info(f"↪️ Transferring {user}'s data to {dest_user}")
|
|
|
self.occ(["files:transfer-ownership", user, dest_user], json=False)
|
|
|
|
|
|
# Continuation of the workaround
|
|
|
self.logger.info(f"🩹 Transferring {user}'s shares to {dest_user}")
|
|
|
self.db.commit() # Get database modification from the ownership transfer
|
|
|
|
|
|
cursor.execute(f"UPDATE {self.nxc_conf['dbtableprefix']}share sh "
|
|
|
# Get all shares
|
|
|
"INNER JOIN save_shares save ON save.id = sh.id "
|
|
|
# Join to have only saved shares in transfer
|
|
|
f"INNER JOIN {self.nxc_conf['dbtableprefix']}filecache fc ON fc.path LIKE save.new_path "
|
|
|
# Get filecache of each share from new path
|
|
|
f"INNER JOIN {self.nxc_conf['dbtableprefix']}storages s ON s.numeric_id = fc.storage AND s.id = CONCAT('object::user:', ?) "
|
|
|
# Get storage of filecache to filter only destination user files
|
|
|
"SET sh.uid_owner = SUBSTR(s.id, 14), sh.uid_initiator = SUBSTR(s.id, 14), "
|
|
|
"sh.item_source = fc.fileid, sh.file_source = fc.fileid "
|
|
|
# Update uid_owner and uid_initiator to dest user, item_source and file_source to new file id
|
|
|
"WHERE sh.share_with IS NULL OR sh.share_with != ?",
|
|
|
# Avoid self sharing
|
|
|
(dest_user, dest_user))
|
|
|
|
|
|
self.logger.info(f"🩹 Removing {user} shares to {dest_user}")
|
|
|
cursor.execute("DELETE sh.* "
|
|
|
# Rome share
|
|
|
f"FROM {self.nxc_conf['dbtableprefix']}share sh "
|
|
|
# Get all shares
|
|
|
"INNER JOIN save_shares save ON save.id = sh.id "
|
|
|
# Join to have only saved shares in transfer
|
|
|
"WHERE sh.share_with = ?",
|
|
|
# Get only self sharing
|
|
|
(dest_user, ))
|
|
|
self.db.commit()
|
|
|
|
|
|
def get_user_cache(self, cursor, user: str):
|
|
|
"""
|
|
|
Get the user cache
|
|
|
:param cursor: The database cursor
|
|
|
:param user: The user UID
|
|
|
:return: The database cursor with an executed statement
|
|
|
"""
|
|
|
cursor.execute("SELECT fc.* "
|
|
|
f"FROM {self.nxc_conf['dbtableprefix']}filecache fc "
|
|
|
f"INNER JOIN {self.nxc_conf['dbtableprefix']}storages s ON s.numeric_id = fc.storage "
|
|
|
"WHERE s.id = ?",
|
|
|
(f"object::user:{user}",))
|
|
|
return cursor
|
|
|
|
|
|
def save_user_cache(self, cursor, user: str, bck_file: Path):
|
|
|
"""
|
|
|
Save the user cache to a CSV file
|
|
|
:param cursor: The database cursor
|
|
|
:param user: The user UID
|
|
|
:param bck_file: The backup file path
|
|
|
"""
|
|
|
self.logger.info("💾 Saving user cache")
|
|
|
with open(bck_file, "w") as file:
|
|
|
w = writer(file)
|
|
|
w.writerow(map(lambda c: c[0], cursor.description))
|
|
|
|
|
|
for data in self.get_user_cache(cursor, user):
|
|
|
w.writerow(data)
|
|
|
|
|
|
@staticmethod
|
|
|
def restore_user_cache(bck_file: Path) -> [[str]]:
|
|
|
"""
|
|
|
Rester user cache from a CSV file
|
|
|
:param bck_file: The backup file path
|
|
|
:return: The CSV file content
|
|
|
"""
|
|
|
with open(bck_file, "r") as file:
|
|
|
return list(reader(file))
|
|
|
|
|
|
def get_user_key(self, cursor, user: str) -> [str]:
|
|
|
"""
|
|
|
Get the user cache formatted as S3 keys
|
|
|
:param cursor: The database cursor
|
|
|
:param user: The user UID
|
|
|
:return: List of S3 keys
|
|
|
"""
|
|
|
cursor.execute(f"SELECT CONCAT('{self.nxt_s3_key_prefix}', fc.fileid) "
|
|
|
f"FROM {self.nxc_conf['dbtableprefix']}filecache fc "
|
|
|
f"INNER JOIN {self.nxc_conf['dbtableprefix']}storages s ON s.numeric_id = fc.storage "
|
|
|
"WHERE s.id = ?",
|
|
|
(f"object::user:{user}",))
|
|
|
return list(map(lambda k: k[0], cursor))
|
|
|
|
|
|
def restore_user_keys(self, bck_file: Path) -> [str]:
|
|
|
return list(map(lambda c: self.nxt_s3_key_prefix + c[0], self.restore_user_cache(bck_file)[1:]))
|
|
|
|
|
|
def clean_s3(self, keys: [str], user):
|
|
|
"""
|
|
|
Clean a Nextcloud user cache from the S3 storage
|
|
|
:param keys: List of S3 keys to remove
|
|
|
:param user: The user UID (for log only)
|
|
|
"""
|
|
|
self.logger.info(f"♻ Deleting S3 {user}'s keys")
|
|
|
for chunk in (keys[pos:pos + self.s3_delete_chunk] for pos in range(0, len(keys), self.s3_delete_chunk)):
|
|
|
res = self.s3.delete_objects(
|
|
|
Delete={
|
|
|
|
|
|
"Objects": [
|
|
|
{
|
|
|
"Key": key
|
|
|
} for key in chunk
|
|
|
]
|
|
|
}
|
|
|
)
|
|
|
|
|
|
self.logger.debug(res)
|
|
|
|
|
|
if "Errors" in res and res["Errors"]:
|
|
|
self.logger.error("🛑 Error on S3 key deletion")
|
|
|
self.logger.debug(res["Errors"], exc_info=True)
|
|
|
raise ValueError("Error on S3 key deletion")
|
|
|
|
|
|
def delete_user(self, user: str):
|
|
|
"""
|
|
|
Remove a Nextcloud user
|
|
|
:param user: The user UID
|
|
|
"""
|
|
|
self.logger.info(f"🗑️ Deleting user {user}")
|
|
|
self.occ(["user:delete", user], json=False)
|
|
|
|
|
|
def clean_user(self, user_email: str, dest_user_email: str = None, bck_file: Path = None):
|
|
|
"""
|
|
|
Delete properly a Nextcloud S3 user
|
|
|
:param user_email: User email
|
|
|
:param dest_user_email: Dest user email for transfer
|
|
|
:param bck_file: Path to backup fle for CSV export
|
|
|
"""
|
|
|
with closing(self.db.cursor()) as cursor:
|
|
|
user = self.find_user(cursor, user_email)
|
|
|
if not user:
|
|
|
raise NextcloudUserNotFound(f"User {user_email} not found")
|
|
|
|
|
|
if dest_user_email:
|
|
|
dest_user = self.find_user(cursor, dest_user_email)
|
|
|
if not dest_user:
|
|
|
raise NextcloudUserNotFound(f"Transfer destination user {dest_user} not found")
|
|
|
|
|
|
self.transfer_data(cursor, user, dest_user)
|
|
|
else:
|
|
|
if bck_file:
|
|
|
self.save_user_cache(cursor, user, bck_file)
|
|
|
else:
|
|
|
self.clean_s3(self.get_user_key(cursor, user), user)
|
|
|
|
|
|
self.delete_user(user)
|
|
|
|
|
|
def clean_all(self, exclude: [str] = None):
|
|
|
"""
|
|
|
Delete properly all Nextcloud user
|
|
|
:param exclude: List of user UID to exclude
|
|
|
"""
|
|
|
with closing(self.db.cursor()) as cursor:
|
|
|
for user in self.all_users(cursor, exclude):
|
|
|
self.clean_s3(self.get_user_key(cursor, user), user)
|
|
|
self.delete_user(user)
|
|
|
|
|
|
def clean_s3_csv(self, bck_file: Path):
|
|
|
"""
|
|
|
Remove S3 keys from CSV backup file
|
|
|
:param bck_file: The backup file path
|
|
|
"""
|
|
|
self.clean_s3(self.restore_user_keys(bck_file), "Unknown")
|
|
|
|
|
|
|
|
|
def main():
|
|
|
parser = ArgumentParser(description="Clean S3 user delete")
|
|
|
parser.add_argument("-p", "--path", metavar="/var/www/nextcloud", type=Path, help="The Nextcloud path",
|
|
|
default=Path("/var/www/nextcloud"))
|
|
|
parser.add_argument("-u", "--user", metavar="email", type=str, help="User email", default=None)
|
|
|
parser.add_argument("-d", "--dest-user", metavar="email", type=str, help="Destination user email", default=None)
|
|
|
parser.add_argument("-b", "--backup-file", metavar="/tmp/oc_filecache.sql", type=Path,
|
|
|
help="Path to user backup file", default=None)
|
|
|
parser.add_argument("--all", action="store_true", help="Clean all user")
|
|
|
parser.add_argument("-e", "--exclude", action="append", help="User to exclude during mass clean")
|
|
|
parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug output")
|
|
|
parser.add_argument("-l", "--log-path", type=Path, help="Path to logfile", default=None)
|
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
if not args.path.exists():
|
|
|
parser.error("Nextcloud folder doesn't exists")
|
|
|
elif args.backup_file and not args.backup_file.parent.exists():
|
|
|
parser.error("Backup path folder doesn't exist")
|
|
|
elif args.log_path and not args.log_path.parent.exists():
|
|
|
parser.error("Log path folder doesn't exist")
|
|
|
|
|
|
cof = None
|
|
|
# noinspection PyBroadException
|
|
|
# Exceptions are already logged in the class
|
|
|
try:
|
|
|
cof = CleanS3User(nxc_path=args.path, log_level=DEBUG if args.verbose else INFO, log_path=args.log_path)
|
|
|
except:
|
|
|
exit(1)
|
|
|
|
|
|
# noinspection PyBroadException
|
|
|
# Exceptions are already logged in the class
|
|
|
try:
|
|
|
if args.user:
|
|
|
cof.clean_user(user_email=args.user, dest_user_email=args.dest_user, bck_file=args.backup_file)
|
|
|
elif args.all:
|
|
|
cof.clean_all(exclude=args.exclude)
|
|
|
elif args.backup_file:
|
|
|
cof.clean_s3_csv(bck_file=args.backup_file)
|
|
|
else:
|
|
|
parser.error("You need to specify at least a user email or a CSV to clean")
|
|
|
except NextcloudUserNotFound as n:
|
|
|
parser.error(n.message)
|
|
|
except:
|
|
|
exit(1)
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
main()
|
|
|
|