""" A script used to migrate a WebDav folder to another with threading """ from re import compile as re_compile from io import BytesIO from tempfile import TemporaryFile from argparse import ArgumentParser, ArgumentTypeError, ArgumentDefaultsHelpFormatter from threading import Thread, Semaphore from queue import Queue, Empty from sys import stdout, exit as sys_exit from os import access, W_OK, SEEK_END from pathlib import Path from json import dump, load from uuid import uuid4 from math import ceil from datetime import datetime, timezone from traceback import format_exc from webdav3.client import Client from webdav3.exceptions import ConnectionException r_dir = re_compile(r"^(.*)/$") class Worker(Thread): """ Thread worker consuming queued task """ running : bool = False def __init__(self, number: int, queue: Queue, progress: callable, errors_lock: Semaphore, tasks_errors: list, *args, **kwargs): """ :number: Worker ID :queue: Tasks queue :progress: Progress callback :errors_lock: Error lock :tasks_errors: Errors list """ super().__init__(*args, **kwargs) self.number = number self.queue = queue self.progress = progress self.errors_lock = errors_lock self.tasks_errors = tasks_errors def run(self): """ Start the worker """ self.running = True while self.running: self.progress(self.number, "Waiting...") try: task, args = self.queue.get(timeout=1) except Empty: continue self.progress(self.number, f"{task.__name__} {args}") try: task(*args) except Exception: # pylint: disable=broad-exception-caught with self.errors_lock: self.tasks_errors.append({"task": task.__name__, "args": args, "exception": format_exc(), "time": datetime.now().isoformat()}) self.queue.task_done() self.progress(self.number, "Stopped") def stop(self): """ Stop the worker """ self.running = False self.progress(self.number, "Stopping...") class Migration: """ A class used to migrate a WebDav folder to another with threading """ queue = Queue() progress_lock = Semaphore() progress_list = {} tasks_errors = [] errors_lock = Semaphore() stats = {"skipped_file": 0, "file": 0, "directorie": 0} stats_lock = Semaphore() def __init__(self, src_client: Client, dest_client: Client, workers: int, memory_limit: int, # pylint: disable=too-many-arguments chunk_upload: int, chunk_upload_size: int, error_dump: Path, sync: bool): """ :src_client: Source WebDav client :dest_client: Dest WebDav client :workers: Number of maximum workers :memory_limit: The file size limit (MB) before using a file instead of RAM as buffer for upload/download :chunk_upload: :error_dump: Path to errors dump """ self.src_client = src_client self.dest_client = self._setup_dest_client(dest_client) self.workers = workers self.memory_limit = memory_limit self.chunk_upload = chunk_upload self.chunk_upload_size = chunk_upload_size self.error_dump = error_dump self.sync = sync def _setup_dest_client(self, client: Client) -> Client: for action in ["upload", "move"]: if action not in client.http_header: client.http_header[action] = [] client.http_header[action].append("X-OC-MTIME: XXXX") return client def start(self, src_path: str, dest_path: str): """ Start the workers with the first explore task at the root :src_path: Root path for source :dest_path: Root path for dest """ self.queue.put((self.explore, (src_path, dest_path))) return self.start_workers() def retry(self, file: Path): """ Start the workers with the errors tasks :file: JSON file of errors """ with file.open("r") as datas: errors = load(datas) for err in errors: if "task" not in err or "args" not in err: continue task = getattr(self, err["task"]) self.queue.put((task, err["args"])) return self.start_workers() def start_workers(self): """ Start the workers """ interrupt = False workers = [] for number in range(self.workers): worker = Worker(number, self.queue, self.progress, self.errors_lock, self.tasks_errors, daemon=True) workers.append(worker) worker.start() try: self.queue.join() except KeyboardInterrupt: interrupt = True self.queue.queue.clear() for worker in workers: worker.stop() for worker in workers: worker.join() return not interrupt def report(self): """ Report stats and errors """ print(f"{self.stats['file']-self.stats['skipped_file']}/{self.stats['file']} files and " \ f"{self.stats['directorie']} directories proceed " \ f"for {len(self.tasks_errors)} errors occurred") with self.error_dump.open("w") as file: dump(self.tasks_errors, file) def progress(self, number: int, msg: str): """ Track workers progress and print it to the user This method is invoked inside workers :param number: The worker ID :param msg: The worker state """ with self.progress_lock: self.progress_list[number] = msg stdout.flush() out = "\x1b[2J\x1b[H" for p_n, p_m in self.progress_list.items(): out += f"Worker {p_n} | {p_m}\n" out += f"{self.stats['file']-self.stats['skipped_file']}/{self.stats['file']} " \ f"files\t{self.stats['directorie']} directories\t{len(self.tasks_errors)} errors\n" stdout.write("\r"+out) stdout.flush() def explore(self, src_path: str, dest_path: str): """ Explore a WebDav folder by listing its files and add tasks to queue to explore sub folders or copy said files :param src_path: Root path for source :param dest_path: Root path for dest """ self.dest_client.mkdir(dest_path) for file in self.src_client.list(src_path)[1:]: new_src_path = src_path + file new_dest_path = dest_path + file task = None if self.src_client.is_dir(new_src_path): with self.stats_lock: self.stats["directorie"] += 1 task = self.explore else: task = self.copy self.queue.put((task, (new_src_path, new_dest_path))) def set_dest_mtime(self, action: str, src_path): """ Get the modification time from the source and add the correct header to the dest client :param action: WebDav action (upload, move, etc...) :param src_path: File path for source """ modified = self.src_client.info(src_path)["modified"] mtime = datetime.strptime(modified, "%a, %d %b %Y %H:%M:%S %Z")\ .replace(tzinfo=timezone.utc).timestamp() self.dest_client.http_header[action][-1] = f"X-OC-MTIME: {int(mtime)}" def copy(self, src_path: str, dest_path: str): """ Copy a file from source to dest using RAM buffer is small enough otherwise creating a temp file buffer :param src_path: File path for source :param dest_path: File path for dest """ src_info = self.src_client.info(src_path) src_size_mb = int(src_info["size"])/1049000 # MB if self.sync and self.dest_client.check(dest_path): dest_info = self.dest_client.info(dest_path) if src_info["size"] == dest_info["size"] and \ src_info["modified"] == dest_info["modified"]: with self.stats_lock: self.stats["skipped_file"] += 1 self.stats["file"] += 1 return with self.stats_lock: self.stats["file"] += 1 if src_size_mb == 0: # Manage empty file self.set_dest_mtime("upload", src_path) self.dest_client.upload_to(None, dest_path) return if self.chunk_upload and src_size_mb > self.chunk_upload: # Manage big file self.copy_chunked(src_path, dest_path) return if src_size_mb > self.memory_limit: # Avoid RAM with large file buffer = TemporaryFile() else: buffer = BytesIO() with buffer as buff: self.src_client.download_from(buff, src_path) buff.seek(0) self.set_dest_mtime("upload", src_path) self.dest_client.upload_to(buff, dest_path) def copy_chunked(self, src_path: str, dest_path: str): """ Uploads a file using chunks. If the file is smaller than ``chunk_size`` it will be uploaded directly. """ chunk_size = self.chunk_upload_size * 1024 * 1024 chunk_folder = f"/uploads/{self.dest_client.webdav.options['login']}" \ f"/webdav_migration-{uuid4()}" self.dest_client.mkdir(chunk_folder) with TemporaryFile() as buff: self.src_client.download_from(buff, src_path) buff.seek(0) size = buff.seek(0, SEEK_END) buff.seek(0) chunk_count = ceil(float(size) / float(chunk_size)) self.set_dest_mtime("upload", src_path) for chunk_index in range(chunk_count): with BytesIO() as chunk_buff: c_z = chunk_buff.write(buff.read(chunk_size)) chunk_buff.seek(0) chunk_name = f"{chunk_index*chunk_size:015d}-" \ f"{(chunk_index*chunk_size)+c_z:015d}" self.dest_client.upload_to(chunk_buff, f"{chunk_folder}/{chunk_name}") self.set_dest_mtime("move", src_path) self.dest_client.move(f"{chunk_folder}/.file", dest_path, True) @staticmethod def check_client(client: Client, path: str): """ Check WebDav client configuration :param client: WebDav client :param path: WebDav root path :return: True if the configuration is valid otherwise an error message """ if not client.valid(): return "Invalide configuration" try: if not client.check(path): return "Path doesn't exist" except ConnectionException as exception: return exception.exception return True def main(): """ When the file is executed, parse arguments and start a migration """ parser = ArgumentParser(prog="webdav_migration", description="Transfer files between WebDavs", formatter_class=ArgumentDefaultsHelpFormatter, usage="python webdav_migration.py " \ "-s 'http://nextcloud.test/remote.php/dav' " \ "-u 'username' " \ "-w 'password' " \ "-p '/files/username/source folder/' " \ "-S 'http://other_nextcloud.test/remote.php/dav' " \ "-U 'another_username' " \ "-W 'another_password' " \ "-P '/files/another_username/dest folder/'") parser.add_argument("--src-host", "-s", required=True, help="Source URL to the WebDav endpoint " \ "(example: https://nextcloud.test/remote.php/dav)") parser.add_argument("--src-user", "-u", required=True, help="Source username for WebDav connection") parser.add_argument("--src-passwd", "-w", required=True, help="Source password for WebDav connection") parser.add_argument("--src-path", "-p", required=True, help="Source root path for WebDav connection " \ "(example: /files/my_username/source folder/)") parser.add_argument("--dest-host", "-S", required=True, help="Dest URL to the WebDav endpoint " \ "(example: https://nextcloud.test/remote.php/dav)") parser.add_argument("--dest-user", "-U", required=True, help="Dest username for WebDav connection") parser.add_argument("--dest-passwd", "-W", required=True, help="Dest password for WebDav connection") parser.add_argument("--dest-path", "-P", required=True, help="Dest root path for WebDav connection " \ "(example: /files/other_nextcloud/dest folder/)") parser.add_argument("--workers", type=int, default=4, help="Number of concurrent threads") parser.add_argument("--memory-limit", type=int, default=100, help="The file size limit (MB) before using a file " \ "instead of RAM as buffer for upload/download") parser.add_argument("--chunk-doxnload-size", type=int, default=65536, help="Chunk size of downloaded file") parser.add_argument("--chunk-upload", type=int, default=100, help="MB to trigger chunked upload, set to 0 to disable") parser.add_argument("--chunk-upload-size", type=int, default=10, help="MB of each uploaded chunk") parser.add_argument("--timeout", type=int, default=600, help="The WebDav requests timeout in seconds") parser.add_argument("--error-dump", default="/tmp/webdav_migration_errors.json", help="The WebDav errors dump") parser.add_argument("--no-sync", default=True, action='store_false', dest="sync", help="Avoid copying of existing file with same size and modification date") parser.add_argument("--retry", default=None, help="Retry from errors stored in a JSON file") try: args = parser.parse_args() src_client = Client({ "webdav_hostname": args.src_host, "webdav_login": args.src_user, "webdav_password": args.src_passwd, "webdav_timeout": args.timeout, "chunk_size": args.chunk_doxnload_size }) check = Migration.check_client(src_client, args.src_path) if check is not True: raise ArgumentTypeError(f"Source client error : {check}") dest_client = Client({ "webdav_hostname": args.dest_host, "webdav_login": args.dest_user, "webdav_password": args.dest_passwd, "webdav_timeout": args.timeout, "chunk_size": args.chunk_doxnload_size }) check = Migration.check_client(dest_client, args.dest_path) if check is not True: raise ArgumentTypeError(f"Dest client error : {check}") error_dump = Path(args.error_dump) if (error_dump.exists() and (not error_dump.is_file() or not access(error_dump, W_OK))) \ or not access(error_dump.parent, W_OK): raise ArgumentTypeError("Errors dump file path invalid") if args.retry: retry_file = Path(args.retry) if not retry_file.exists() or not retry_file.is_file() or not access(retry_file, W_OK): raise ArgumentTypeError("Retry file path invalid") if error_dump ==retry_file: raise ArgumentTypeError("Errors dump and retry file is the same") except ArgumentTypeError as exception: print(exception) sys_exit(1) migration = Migration(src_client, dest_client, args.workers, args.memory_limit, args.chunk_upload, args.chunk_upload_size, error_dump, args.sync) if not args.retry: not_interrupt = migration.start(args.src_path, args.dest_path) else: not_interrupt = migration.retry(retry_file) migration.report() if not not_interrupt: print("Process interrupted") sys_exit(2) if __name__ == "__main__": main()