Initial commit

master
Florian Charlaix 3 years ago
commit d85e14c17e
  1. 27
      README.md
  2. 344
      clean_s3_user.py
  3. 2
      requirements.txt

@ -0,0 +1,27 @@
# Clean S3 user
Properly clean a Nextcloud user with S3 storage
## Installation
```bash
apt install libmariadb3 libmariadb-dev
pip3 install -r requirements.txt
```
## Usage
* **On the Nextcloud host**
```bash
# One user immediate deletion
python3 clean_s3_user.py -p /opt/nextcloud -u test@sitiv.fr
# One user with delayed S3 deletion
python3 clean_s3_user.py -p /opt/nextcloud -u test@sitiv.fr -b cache.csv
# One user transfer
python3 clean_s3_user.py -p /opt/nextcloud -u test@sitiv.fr -d admin@sitiv.fr
# Mass user deletion with exception
python3 clean_s3_user.py -p /opt/nextcloud --all -e odsiadmin -e supervision -e watcha
# Enable debug
python3 clean_s3_user.py -p /opt/nextcloud [...] -v
# Save log to file
python3 clean_s3_user.py -p /opt/nextcloud [...] -l clean.log
# Full example
python3 clean_s3_user.py -p /opt/nextcloud --all -e odsiadmin -e supervision -e watcha -l clean.log -v
```

@ -0,0 +1,344 @@
from contextlib import closing
from logging import getLogger, StreamHandler, FileHandler, Formatter, INFO, DEBUG
from argparse import ArgumentParser, ArgumentTypeError
from subprocess import check_output
from json import loads
from pathlib import Path
from csv import writer, reader
from typing import Union
from botocore.config import Config
from mariadb import connect, connection
from boto3 import resource
class CleanS3User:
occ_php_opts: [str] = []
occ_opts = ["--no-ansi", "--no-interaction", "--no-warnings"]
nxt_s3_key_prefix = "urn:oid:"
s3_delete_chunk = 1000
def __init__(self, nxc_path: Path = Path("/var/www/nextcloud"), log_level: int = INFO, log_path: Path = None):
"""
Init CleanS3User, fetching Nextcloud configuration and init database and S3 connection
:param nxc_path: Path to Nextcloud
:param log_level: The level for the logger
:param log_path: The log output path
"""
self.logger = self.init_logger(log_level, log_path)
try:
self.nxc_path = nxc_path
self.nxc_conf = self.get_nxc_conf()
self.db = self.connect_db()
self.s3 = self.connect_s3()
except Exception as e:
self.logger.exception(e)
raise e
def __del__(self):
"""
Close database connection and rollback any non committed changes
"""
if getattr(self, "db", None) and self.db and self.db.ping():
self.db.rollback()
self.db.close()
def init_logger(self, log_level: int = INFO, log_path: Path = None):
"""
Setup logger
:param log_level: The level for the logger
:param log_path: The log output path
"""
logger = getLogger("clean_s3_user")
ch = StreamHandler()
ch.setFormatter(
Formatter("%(message)s"))
logger.addHandler(ch)
if log_path:
fh = FileHandler(log_path)
fh.setFormatter(Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
logger.addHandler(fh)
logger.setLevel(log_level)
return logger
def occ(self, args: [str], json: bool = True) -> Union[dict, str]:
"""
Send a Nextcloud OCC command
:param args: The command args
:param json: Parse command output as JSON
:return: Dictionary if JSON or string
"""
args = ["sudo", "-u", "www-data", "php", *self.occ_php_opts, str(self.nxc_path / "occ"), *self.occ_opts, *args]
self.logger.debug("⚙ Executing occ command: " + " ".join(args))
out = check_output(args)
if json:
return loads(out)
else:
return out
def get_nxc_conf(self) -> dict:
"""
Get the Nextcloud configuration
:return: Nextcloud system configuration
"""
return self.occ(["config:list", "--private"])["system"]
def connect_db(self) -> connection:
"""
Establish a connection to the database
:raises:
mariadb.Error: When an error occur on the first connection with teh database
:return: The MariaDB connection
"""
conn = {}
host = self.nxc_conf["dbhost"].split(":")
if len(host) > 1:
conn["host"] = host[0]
port = host[1]
try:
conn["port"] = int(port)
except ValueError:
conn["unix_socket"] = port
else:
conn["host"] = self.nxc_conf["dbhost"]
conn["port"] = int(self.nxc_conf["dbport"])
conn["user"] = self.nxc_conf["dbuser"]
conn["password"] = self.nxc_conf["dbpassword"]
conn["database"] = self.nxc_conf["dbname"]
conn["autocommit"] = False
self.logger.info("🗄 Connecting to database")
self.logger.debug(conn)
return connect(**conn)
def connect_s3(self):
"""
Establish a connection to the S3 bucket
:return: The S3 Bucket
"""
config = self.nxc_conf["objectstore"]["arguments"]
self.logger.info("🗂 Connection to S3")
self.logger.debug(config)
c = resource(service_name="s3", region_name=config["region"], use_ssl=config['use_ssl'],
endpoint_url=f"{'https' if config['use_ssl'] else 'http'}://{config['hostname']}:{config['port']}",
aws_access_key_id=config["key"], aws_secret_access_key=config["secret"],
config=Config(signature_version="s3v4"))
return c.Bucket(config["bucket"])
def find_user(self, cursor, email: str) -> str:
"""
Find Nextcloud user by email
:param cursor: The database cursor
:param email: The user email
:return: The user Nextcloud UID
"""
email = self.db.escape_string(email)
cursor.execute('SELECT uid FROM oc_accounts WHERE data LIKE \'%"email":{"value":"'+email+'","%\'')
res = cursor.next()
return res[0] if res else None
def all_users(self, cursor, exclude: [str] = None) -> [str]:
"""
Return all Nextcloud users UID
:param cursor: The database cursor
:param exclude: List of user UID to exclude
:return: UID list of all Nextcloud user
"""
if exclude is None:
exclude = []
cursor.execute("SELECT uid FROM oc_accounts")
return list(filter(lambda u: u not in exclude, map(lambda u: u[0], cursor)))
def transfer_data(self, user: str, dest_user: str):
"""
Transfer user files and shares to another
:param user: The source user UID
:param dest_user: The destination user UID
"""
self.logger.info(f" Transferring {user}'s data to {dest_user}")
self.occ(["files:transfer-ownership", user, dest_user])
def get_user_cache(self, cursor, user: str):
"""
Get the user cache
:param cursor: The database cursor
:param user: The user UID
:return: The database cursor with an executed statement
"""
cursor.execute("SELECT fc.* "
"FROM oc_filecache fc "
"INNER JOIN oc_storages s ON s.numeric_id = fc.storage "
"WHERE s.id = ?",
(f"object::user:{user}",))
return cursor
def save_user_cache(self, cursor, user: str, bck_file: Path):
"""
Save the user cache to a CSV file
:param cursor: The database cursor
:param user: The user UID
:param bck_file: The backup file path
"""
self.logger.info("💾 Saving user cache")
with open(bck_file, "w") as file:
w = writer(file)
w.writerow(map(lambda c: c[0], cursor.description))
for data in self.get_user_cache(cursor, user):
w.writerow(data)
def restore_user_cache(self, bck_file: Path) -> [[str]]:
"""
Rester user cache from a CSV file
:param bck_file: The backup file path
:return: The CSV file content
"""
with open(bck_file, "r") as file:
return list(reader(file))
def get_user_key(self, cursor, user: str) -> [str]:
"""
Get the user cache formatted as S3 keys
:param cursor: The database cursor
:param user: The user UID
:return: List of S3 keys
"""
cursor.execute(f"SELECT CONCAT('{self.nxt_s3_key_prefix}', fc.fileid) "
"FROM oc_filecache fc "
"INNER JOIN oc_storages s ON s.numeric_id = fc.storage "
"WHERE s.id = ?",
(f"object::user:{user}",))
return list(map(lambda k: k[0], cursor))
def restore_user_keys(self, bck_file: Path) -> [str]:
return list(map(lambda c: self.nxt_s3_key_prefix + c[0], self.restore_user_cache(bck_file)[1:]))
def clean_s3(self, keys: [str], user):
"""
Clean a Nextcloud user cache from the S3 storage
:param keys: List of S3 keys to remove
:param user: The user UID (for log only)
"""
self.logger.info(f"♻ Deleting S3 {user}'s keys")
for chunk in (keys[pos:pos + self.s3_delete_chunk] for pos in range(0, len(keys), self.s3_delete_chunk)):
res = self.s3.delete_objects(
Delete={
"Objects": [
{
"Key": key
} for key in chunk
]
}
)
self.logger.debug(res)
if "Errors" in res and res["Errors"]:
raise ValueError(res["Errors"])
def delete_user(self, user: str):
"""
Remove a Nextcloud user
:param user: The user UID
"""
self.logger.info(f"🗑 Deleting user {user}")
self.occ(["user:delete", user], json=False)
def clean_user(self, user_email: str, dest_user_email: str = None, bck_file: Path = None):
"""
Delete properly a Nextcloud S3 user
:param user_email: User email
:param dest_user_email: Dest user email for transfer
:param bck_file: Path to backup fle for CSV export
"""
with closing(self.db.cursor()) as cursor:
user = self.find_user(cursor, user_email)
if not user:
raise ValueError("User not found")
if dest_user_email:
dest_user = self.find_user(cursor, dest_user_email)
if not dest_user:
raise ValueError("Transfer destination user not found")
self.transfer_data(user, dest_user)
else:
if bck_file:
self.save_user_cache(cursor, user, bck_file)
else:
self.clean_s3(self.get_user_key(cursor, user), user)
self.delete_user(user)
def clean_all(self, exclude: [str] = None):
"""
Delete properly all Nextcloud user
:param exclude: List of user UID to exclude
"""
with closing(self.db.cursor()) as cursor:
for user in self.all_users(cursor, exclude):
self.clean_s3(self.get_user_key(cursor, user), user)
self.delete_user(user)
def clean_s3_csv(self, bck_file: Path):
"""
Remove S3 keys from CSV backup file
:param bck_file: The backup file path
"""
self.clean_s3(self.restore_user_keys(bck_file), "Unknown")
def main():
parser = ArgumentParser(description="Clean S3 user delete")
parser.add_argument("-p", "--path", metavar="/var/www/nextcloud", type=Path, help="The Nextcloud path",
default=Path("/var/www/nextcloud"))
parser.add_argument("-u", "--user", metavar="email", type=str, help="User email", default=None)
parser.add_argument("-d", "--dest-user", metavar="email", type=str, help="Destination user email", default=None)
parser.add_argument("-b", "--backup-file", metavar="/tmp/oc_filecache.sql", type=Path,
help="Path to user backup file", default=None)
parser.add_argument("--all", action="store_true", help="Clean all user")
parser.add_argument("-e", "--exclude", action="append", help="User to exclude during mass clean")
parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug output")
parser.add_argument("-l", "--log-path", type=Path, help="Path to logfile", default=None)
args = parser.parse_args()
if not args.path.exists():
raise ArgumentTypeError("Nextcloud folder doesn't exists")
elif args.backup_file and not args.backup_file.parent.exists():
raise ArgumentTypeError("Backup path folder doesn't exist")
elif args.log_path and not args.log_path.parent.exists():
raise ArgumentTypeError("Log path folder doesn't exist")
cof = None
# noinspection PyBroadException
try:
cof = CleanS3User(nxc_path=args.path, log_level=DEBUG if args.verbose else INFO, log_path=args.log_path)
except:
exit(1)
try:
if args.user:
cof.clean_user(user_email=args.user, dest_user_email=args.dest_user, bck_file=args.backup_file)
elif args.all:
cof.clean_all(exclude=args.exclude)
elif args.backup_file:
cof.clean_s3_csv(bck_file=args.backup_file)
else:
raise ArgumentTypeError("You need to specify at least a user email or a CSV to clean")
except Exception as e:
cof.logger.exception(e)
exit(1)
if __name__ == '__main__':
main()

@ -0,0 +1,2 @@
boto3==1.24.11
mariadb==1.0.11
Loading…
Cancel
Save