Initial commit

master
Florian Charlaix 10 months ago
commit 944d4c282d
  1. 1
      .gitignore
  2. 395
      nextcloud_backup.py

1
.gitignore vendored

@ -0,0 +1 @@
/.vscode

@ -0,0 +1,395 @@
"""
Nextcloud backup script
"""
from logging import getLogger, basicConfig, DEBUG, INFO, WARN, ERROR, CRITICAL
from subprocess import run, PIPE, CalledProcessError
from json import loads
from tempfile import NamedTemporaryFile, _TemporaryFileWrapper
from os import environ
from sys import stderr
from argparse import ArgumentParser, BooleanOptionalAction, ArgumentTypeError
from pathlib import Path
from typing import Optional, Any
log_levels = {
0: CRITICAL,
1: ERROR,
2: WARN,
3: INFO,
4: DEBUG,
}
class NextcloudBackup:
"""
Nextcloud backup class
"""
_db : Optional[dict] = None
_s3 : Optional[dict] = None
_psql_env : Optional[dict] = None
_s3_env : Optional[dict] = None
def __init__(self, nextcloud: dict, backup_path: Path):
self.logger = getLogger(__name__)
self.nextcloud = nextcloud
self.backup_path = backup_path
@staticmethod
def merge_env(env: dict) -> dict:
"""
Merge system environment with given environment
:param env: New environment
:return: Merged environment
"""
tmp_env = environ.copy()
tmp_env.update(env)
return tmp_env
def occ(self, occ_args: list, json: bool = True) -> Any:
"""
Run a Nextcloud OCC CLI command
:param occ_args: OCC args
:param json: Add '--output=json" to args and parse the JSON, defaults to True
:return: Return the output of the command as string or as an object parsed if JSON enabled
"""
args = ["sudo", "-u", "www-data", "PHP_MEMORY_LIMIT=1G", self.nextcloud["php"],
f"{self.nextcloud['path']}/occ", "--no-interaction", "--no-ansi", "--no-warnings"]
if json:
args += ["--output=json"]
self.logger.debug(args + occ_args)
out = run(args + occ_args, check=True, stdout=PIPE, stderr=stderr).stdout.decode().strip()
self.logger.debug(out)
if json:
return loads(out)
return out
def test_nextcloud(self) -> bool:
"""
Check if Nextcloud parameters are valid
:return: True if the process can communicate with Nextcloud OCC, else False
"""
try:
self.occ(["check"], json=False)
return True
except CalledProcessError:
return False
def nxc_maintenance(self, enabled: bool):
"""
Enable/disable Nextcloud maintenance mode
:param enabled: Enable or disable maintenance
"""
self.logger.info("%s maintenance mode", 'Enabling' if enabled else 'Disabling')
self.occ(["maintenance:mode", f"--{'on' if enabled else 'off'}"], json=False)
def rclone(self, args: Optional[list] = None, env: Optional[dict] = None,
json: bool = False) -> Any:
"""
Run Rclone command
:param args: Rclone args, defaults to None
:param env: Rclone additional environment, defaults to None
:param json: Parse output as JSON, defaults to False
:return: Return the output of the command as string or as an object parsed if JSON enabled
"""
if args is None:
args = []
if env is None:
env = {}
env = self.merge_env(env)
self.logger.debug({"args":["rclone"] + args})
self.logger.debug(env)
out = run(["rclone"] + args, env=env, check=True, stdout=PIPE).stdout.decode().strip()
self.logger.debug(out)
if json:
return loads(out)
return out
def rsync(self, args: Optional[list] = None, env: Optional[dict] = None):
"""
Run rsync command
:param args: rsync args, defaults to None
:param env: rsync additional environment, defaults to None
:return: The competed process
"""
if args is None:
args = []
if env is None:
env = {}
env = self.merge_env(env)
self.logger.debug({"args":["rsync"] + args, "env": env})
return run(["rsync"] + args, env=env, check=True)
def pg_dump(self, dump_file):
"""
Dump Postgres database to file
:param dump_file: Target file to output the dump
"""
env = self.merge_env(self.psql_env)
with open(dump_file, "w", encoding="utf-8") as dump_fd:
run(["pg_dump"], stdout=dump_fd, env=env, check=True)
@property
def psql_env(self):
"""
Postgres CLI environment with cache
:return: Postgres CLI environment
"""
if self._psql_env:
return self._psql_env
env = {
"PGDATABASE": self.db["dbname"],
"PGHOST": self.db["dbhost"],
"PGPORT": self.db["dbport"],
"PGUSER": self.db["dbuser"],
"PGPASSWORD": self.db["dbpassword"]
}
self.logger.debug(env)
self._psql_env = env
return env
@property
def db(self) -> dict:
"""
Get database parameters from Nextcloud config with cache
:return: Database parameters
"""
if self._db:
return self._db
db = {}
for i in ["dbtype", "dbname", "dbhost", "dbport", "dbuser", "dbpassword"]:
db[i] = self.occ(["config:system:get", i])
self.logger.debug(db)
self._db = db
return db
def test_db(self) -> bool:
"""
Check if database parameters are valid
:return: True if the process can communicate with the database, else False
"""
try:
if self.db["dbtype"] == "pgsql":
run(["pg_isready"], env=self.psql_env, check=True)
else:
raise ArgumentTypeError("Unsupported database type")
return True
except CalledProcessError:
return False
def dump_db(self) -> _TemporaryFileWrapper:
"""
Dump database to a file
:raises ArgumentTypeError: If incompatible database
:return: Database file
"""
self.logger.info("Dumping databse")
dump_file = NamedTemporaryFile(delete=False)
self.logger.debug("Database dump file %s", dump_file.name)
if self.db["dbtype"] == "pgsql":
self.pg_dump(dump_file.name)
else:
raise ArgumentTypeError("Unsupported database type")
return dump_file
def copy_db(self, dump_file: str):
"""
Copy database dump to the backup path
:param dump_file: Database dump file
"""
self.logger.info("Copying database")
dest = self.backup_path/"db.sql"
self.rsync([dump_file, str(dest)])
@property
def s3(self) -> dict:
"""
Get S3 parameters from Nextcloud config with cache
:return: S3 parameters
"""
if self._s3:
return self._s3
s3 = {}
for i in ["bucket", "key", "secret", "hostname", "region", "port", "use_ssl"]:
s3[i] = self.occ(["config:system:get", "objectstore", "arguments", i])
self.logger.debug(s3)
self._s3 = s3
return s3
@property
def s3_env(self) -> dict:
"""
S3 Rclone environment with cache
:return: S3 Rclone environment
"""
if self._s3_env:
return self._s3_env
env = {
"RCLONE_S3_PROVIDER": "minio",
"RCLONE_S3_ENV_AUTH": "true",
"RCLONE_S3_ACCESS_KEY_ID": self.s3["key"],
"RCLONE_S3_SECRET_ACCESS_KEY": self.s3["secret"],
"RCLONE_S3_REGION": self.s3["region"],
"RCLONE_S3_ENDPOINT": f"http{'s' if self.s3['use_ssl'] else ''}://"\
f"{self.s3['hostname']}:{self.s3['port']}",
}
self.logger.debug(env)
self._s3_env = env
return env
def test_s3(self) -> bool:
"""
Check if S3 parameters are valid
:return: True if Rclone can communicate with the S3, else False
"""
try:
lsjson = self.rclone(["lsjson", "--max-depth=1", ":s3:"], self.s3_env, json=True)
if self.s3["bucket"] not in map(lambda e: e.get("Path"), lsjson):
return False
return True
except CalledProcessError:
return False
def copy_s3(self, rclone_remote_control: bool = True):
"""
Copy S3 to the backup path with Rclone
:param rclone_remote_control: Enable Rclone remote control, defaults to True
"""
self.logger.info("Copying S3")
env = self.s3_env
dest = self.backup_path/"s3"
dest.mkdir(exist_ok=True)
args = ["copy", f":s3:{self.s3['bucket']}/", str(dest)]
if rclone_remote_control:
args = ["--rc"] + args
self.rclone(args, env)
def backup(self, maintenance : bool = True, rclone_remote_control: bool = True):
"""
Backup Nextcloud database and S3 to backup path
:param maintenance: Enable maintenance during backup, defaults to True
:param rclone_remote_control: Enable Rclone remote control, defaults to True
"""
self.logger.info("Starting backup")
if maintenance:
self.nxc_maintenance(True)
dump_file = self.dump_db()
self.copy_db(dump_file.name)
self.copy_s3(rclone_remote_control)
if maintenance:
self.nxc_maintenance(False)
def check(self):
"""
Check if last backup run successfully
TODO/WIP
"""
def main():
"""
Main program function
"""
parser = ArgumentParser(prog="Nextcloud backup")
subparsers = parser.add_subparsers(dest="action", required=True)
parser.add_argument("--verbose", "-v", dest="verbosity", action="count", default=0,
help="Verbosity (between 1-4 occurrences with more leading to more "
"verbose logging). CRITICAL=0, ERROR=1, WARN=2, INFO=3, "
"DEBUG=4")
parser_backup = subparsers.add_parser("backup")
parser_backup.add_argument("--php", "-p", default="php", type=Path)
parser_backup.add_argument("--nextcloud", "-n", default="/opt/nextcloud", type=Path)
parser_backup.add_argument("--backup", "-b", default="/opt/backupnextcloud", type=Path)
parser_backup.add_argument("--maintenance", "-m",
action=BooleanOptionalAction, default=True)
parser_backup.add_argument("--rclone-remote-control", "-r",
action=BooleanOptionalAction, default=True)
parser_check = subparsers.add_parser("check")
parser_check.add_argument("--backup", "-b", default="/opt/backupnextcloud", type=Path)
args = parser.parse_args()
basicConfig(level=log_levels[args.verbosity])
if not args.nextcloud.exists() or not args.nextcloud.is_dir():
raise ArgumentTypeError("Invalid Nextcloud path")
if not args.backup.exists() or not args.backup.is_dir():
raise ArgumentTypeError("Invalid backup path")
nextcloud = {
"php": args.php,
"path": args.nextcloud
}
nb = NextcloudBackup(nextcloud, args.backup)
if args.action == "backup":
if not nb.test_nextcloud():
raise ArgumentTypeError("Nextcloud check faild")
if not nb.test_db():
raise ArgumentTypeError("Database check faild")
if not nb.test_s3():
raise ArgumentTypeError("S3 check faild")
nb.backup(args.maintenance, args.rclone_remote_control)
elif args.action == "check":
nb.check()
if __name__ == "__main__":
main()
Loading…
Cancel
Save