""" Nextcloud backup script """ from logging import getLogger, basicConfig, DEBUG, INFO, WARN, ERROR, CRITICAL from subprocess import run, PIPE, CalledProcessError from json import loads from tempfile import NamedTemporaryFile, _TemporaryFileWrapper from os import environ, access, X_OK from sys import stderr from argparse import ArgumentParser, BooleanOptionalAction, ArgumentTypeError from pathlib import Path from typing import Optional, Any, Dict from shutil import which log_levels = { 0: CRITICAL, 1: ERROR, 2: WARN, 3: INFO, 4: DEBUG, } def executable(value: str) -> Path: """ Check if the argument is a valid executable :param value: Raw path to executable file :return: Path to executable file """ path = which(value) if not path: raise ArgumentTypeError(f"{value} not found") path = Path(path) if not access(path, X_OK): raise ArgumentTypeError(f"{value} is not executable") return path def directory(value: str) -> Path: """ Check if the argument is an existing directory :param value: Raw path :return: Path """ path = Path(value) if not path.exists(): raise ArgumentTypeError(f"{value} doesn't exist") if not path.is_dir(): raise ArgumentTypeError(f"{value} is not a directory") return path class NextcloudBackup: """ Nextcloud backup class """ _db : Optional[dict] = None _s3 : Optional[dict] = None _psql_env : Optional[dict] = None _s3_env : Optional[dict] = None def __init__(self, nextcloud: dict, backup_path: Path): self.logger = getLogger(__name__) self.nextcloud = nextcloud self.backup_path = backup_path @staticmethod def merge_env(env: dict) -> dict: """ Merge system environment with given environment :param env: New environment :return: Merged environment """ tmp_env = environ.copy() tmp_env.update(env) return tmp_env def occ(self, occ_args: list, json: bool = True) -> Any: """ Run a Nextcloud OCC CLI command :param occ_args: OCC args :param json: Add '--output=json" to args and parse the JSON, defaults to True :return: Return the output of the command as string or as an object parsed if JSON enabled """ args = ["sudo", "-u", "www-data", "PHP_MEMORY_LIMIT=1G", self.nextcloud["php"], f"{self.nextcloud['path']}/occ", "--no-interaction", "--no-ansi", "--no-warnings"] if json: args += ["--output=json"] self.logger.debug(args + occ_args) out = run(args + occ_args, check=True, stdout=PIPE, stderr=stderr).stdout.decode().strip() self.logger.debug(out) if json: return loads(out) return out def test_nextcloud(self) -> bool: """ Check if Nextcloud parameters are valid :return: True if the process can communicate with Nextcloud OCC, else False """ try: self.occ(["check"], json=False) return True except CalledProcessError: return False def nxc_maintenance(self, enabled: bool): """ Enable/disable Nextcloud maintenance mode :param enabled: Enable or disable maintenance """ self.logger.info("%s maintenance mode", 'Enabling' if enabled else 'Disabling') self.occ(["maintenance:mode", f"--{'on' if enabled else 'off'}"], json=False) def rclone(self, args: Optional[list] = None, env: Optional[dict] = None, json: bool = False) -> Any: """ Run Rclone command :param args: Rclone args, defaults to None :param env: Rclone additional environment, defaults to None :param json: Parse output as JSON, defaults to False :return: Return the output of the command as string or as an object parsed if JSON enabled """ if args is None: args = [] if env is None: env = {} env = self.merge_env(env) self.logger.debug({"args":["rclone"] + args}) self.logger.debug(env) out = run(["rclone"] + args, env=env, check=True, stdout=PIPE).stdout.decode().strip() self.logger.debug(out) if json: return loads(out) return out def rsync(self, args: Optional[list] = None, env: Optional[dict] = None): """ Run rsync command :param args: rsync args, defaults to None :param env: rsync additional environment, defaults to None :return: The competed process """ if args is None: args = [] if env is None: env = {} env = self.merge_env(env) self.logger.debug({"args":["rsync"] + args, "env": env}) return run(["rsync"] + args, env=env, check=True) def pg_dump(self, dump_file): """ Dump Postgres database to file :param dump_file: Target file to output the dump """ env = self.merge_env(self.psql_env) with open(dump_file, "w", encoding="utf-8") as dump_fd: run(["pg_dump"], stdout=dump_fd, env=env, check=True) @property def psql_env(self): """ Postgres CLI environment with cache :return: Postgres CLI environment """ if self._psql_env: return self._psql_env env = { "PGDATABASE": self.db["dbname"], "PGHOST": self.db["dbhost"], "PGPORT": self.db["dbport"], "PGUSER": self.db["dbuser"], "PGPASSWORD": self.db["dbpassword"] } self.logger.debug(env) self._psql_env = env return env @property def db(self) -> dict: """ Get database parameters from Nextcloud config with cache :return: Database parameters """ if self._db: return self._db db = {} for i in ["dbtype", "dbname", "dbhost", "dbport", "dbuser", "dbpassword"]: db[i] = self.occ(["config:system:get", i]) self.logger.debug(db) self._db = db return db def test_db(self) -> bool: """ Check if database parameters are valid :return: True if the process can communicate with the database, else False """ try: if self.db["dbtype"] == "pgsql": run(["pg_isready"], env=self.psql_env, check=True) else: raise ArgumentTypeError("Unsupported database type") return True except CalledProcessError: return False def dump_db(self) -> _TemporaryFileWrapper: """ Dump database to a file :raises ArgumentTypeError: If incompatible database :return: Database file """ self.logger.info("Dumping databse") dump_file = NamedTemporaryFile(delete=False) self.logger.debug("Database dump file %s", dump_file.name) if self.db["dbtype"] == "pgsql": self.pg_dump(dump_file.name) else: raise ArgumentTypeError("Unsupported database type") return dump_file def copy_db(self, dump_file: str): """ Copy database dump to the backup path :param dump_file: Database dump file """ self.logger.info("Copying database") dest = self.backup_path/"db.sql" self.rsync([dump_file, str(dest)]) @property def s3(self) -> dict: """ Get S3 parameters from Nextcloud config with cache :return: S3 parameters """ if self._s3: return self._s3 s3 = {} for i in ["bucket", "key", "secret", "hostname", "region", "port", "use_ssl"]: s3[i] = self.occ(["config:system:get", "objectstore", "arguments", i]) self.logger.debug(s3) self._s3 = s3 return s3 @property def s3_env(self) -> dict: """ S3 Rclone environment with cache :return: S3 Rclone environment """ if self._s3_env: return self._s3_env env = { "RCLONE_S3_PROVIDER": "minio", "RCLONE_S3_ENV_AUTH": "true", "RCLONE_S3_ACCESS_KEY_ID": self.s3["key"], "RCLONE_S3_SECRET_ACCESS_KEY": self.s3["secret"], "RCLONE_S3_REGION": self.s3["region"], "RCLONE_S3_ENDPOINT": f"http{'s' if self.s3['use_ssl'] else ''}://"\ f"{self.s3['hostname']}:{self.s3['port']}", } self.logger.debug(env) self._s3_env = env return env def test_s3(self) -> bool: """ Check if S3 parameters are valid :return: True if Rclone can communicate with the S3, else False """ try: lsjson = self.rclone(["lsjson", "--max-depth=1", ":s3:"], self.s3_env, json=True) if self.s3["bucket"] not in map(lambda e: e.get("Path"), lsjson): return False return True except CalledProcessError: return False def copy_s3(self, rclone_remote_control: bool = True): """ Copy S3 to the backup path with Rclone :param rclone_remote_control: Enable Rclone remote control, defaults to True """ self.logger.info("Copying S3") env = self.s3_env dest = self.backup_path/"s3" dest.mkdir(exist_ok=True) args = ["copy", f":s3:{self.s3['bucket']}/", str(dest)] if rclone_remote_control: args = ["--rc"] + args self.rclone(args, env) def backup(self, maintenance : bool = True, rclone_remote_control: bool = True): """ Backup Nextcloud database and S3 to backup path :param maintenance: Enable maintenance during backup, defaults to True :param rclone_remote_control: Enable Rclone remote control, defaults to True """ self.logger.info("Starting backup") if maintenance: self.nxc_maintenance(True) dump_file = self.dump_db() self.copy_db(dump_file.name) self.copy_s3(rclone_remote_control) if maintenance: self.nxc_maintenance(False) def check(self): """ Check if last backup run successfully TODO/WIP """ def tests(self): """ Launch all tests """ if not self.test_nextcloud(): raise ValueError("Nextcloud check faild") if not self.test_db(): raise ValueError("Database check faild") if not self.test_s3(): raise ValueError("S3 check faild") def main(): """ Main program function """ parser = ArgumentParser(prog="Nextcloud backup") subparsers = parser.add_subparsers(dest="action", required=True) parser.add_argument("--verbose", "-v", dest="verbosity", action="count", default=0, help="Verbosity (between 1-4 occurrences with more leading to more " "verbose logging). CRITICAL=0, ERROR=1, WARN=2, INFO=3, " "DEBUG=4") parser.add_argument("--log-file", "-f", type=Path) parser.add_argument("--php", "-p", default="php", type=executable) parser.add_argument("--nextcloud", "-n", default="/opt/nextcloud", type=directory) parser.add_argument("--backup", "-b", default="/opt/backupnextcloud", type=directory) parser_backup = subparsers.add_parser("backup") parser_backup.add_argument("--maintenance", "-m", action=BooleanOptionalAction, default=True) parser_backup.add_argument("--rclone-remote-control", "-r", action=BooleanOptionalAction, default=True) parser_check = subparsers.add_parser("check") args = parser.parse_args() logging_confg : Dict[str, Any] = { "level": log_levels[args.verbosity] } if "file_log" in args and args.file_log: logging_confg["filename"] = args.log_file logging_confg["filemode"] = "a" basicConfig(**logging_confg) if not args.nextcloud.exists() or not args.nextcloud.is_dir(): raise ArgumentTypeError("Invalid Nextcloud path") if not args.backup.exists() or not args.backup.is_dir(): raise ArgumentTypeError("Invalid backup path") nb = NextcloudBackup({"php": args.php, "path": args.nextcloud}, args.backup) try: nb.tests() except ValueError as e: nb.logger.error(str(e)) if args.action == "backup": nb.backup(args.maintenance, args.rclone_remote_control) elif args.action == "check": nb.check() if __name__ == "__main__": main()