A first run migration tool
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
firstrunmigrate/scripts/webdav_migration.py

450 lines
16 KiB

"""
A script used to migrate a WebDav folder to another with threading
"""
from re import compile as re_compile
from io import BytesIO
from tempfile import TemporaryFile
from argparse import ArgumentParser, ArgumentTypeError, ArgumentDefaultsHelpFormatter
from threading import Thread, Semaphore
from queue import Queue, Empty
from sys import stdout, exit as sys_exit
from os import access, W_OK, SEEK_END
from pathlib import Path
from json import dump, load
from uuid import uuid4
from math import ceil
from datetime import datetime, timezone
from traceback import format_exc
from webdav3.client import Client
from webdav3.exceptions import ConnectionException
r_dir = re_compile(r"^(.*)/$")
class Worker(Thread):
"""
Thread worker consuming queued task
"""
running : bool = False
def __init__(self, number: int, queue: Queue, progress: callable, errors_lock: Semaphore,
tasks_errors: list, *args, **kwargs):
"""
:number: Worker ID
:queue: Tasks queue
:progress: Progress callback
:errors_lock: Error lock
:tasks_errors: Errors list
"""
super().__init__(*args, **kwargs)
self.number = number
self.queue = queue
self.progress = progress
self.errors_lock = errors_lock
self.tasks_errors = tasks_errors
def run(self):
"""
Start the worker
"""
self.running = True
while self.running:
self.progress(self.number, "Waiting...")
try:
task, args = self.queue.get(timeout=1)
except Empty:
continue
self.progress(self.number, f"{task.__name__} {args}")
try:
task(*args)
except Exception: # pylint: disable=broad-exception-caught
with self.errors_lock:
self.tasks_errors.append({"task": task.__name__,
"args": args,
"exception": format_exc(),
"time": datetime.now().isoformat()})
self.queue.task_done()
self.progress(self.number, "Stopped")
def stop(self):
"""
Stop the worker
"""
self.running = False
self.progress(self.number, "Stopping...")
class Migration:
"""
A class used to migrate a WebDav folder to another with threading
"""
queue = Queue()
progress_lock = Semaphore()
progress_list = {}
tasks_errors = []
errors_lock = Semaphore()
stats = {"skipped_file": 0, "file": 0, "directorie": 0}
stats_lock = Semaphore()
def __init__(self, src_client: Client, dest_client: Client, workers: int, memory_limit: int, # pylint: disable=too-many-arguments
chunk_upload: int, chunk_upload_size: int, error_dump: Path, sync: bool):
"""
:src_client: Source WebDav client
:dest_client: Dest WebDav client
:workers: Number of maximum workers
:memory_limit: The file size limit (MB) before using a file instead of RAM as buffer
for upload/download
:chunk_upload:
:error_dump: Path to errors dump
"""
self.src_client = src_client
self.dest_client = self._setup_dest_client(dest_client)
self.workers = workers
self.memory_limit = memory_limit
self.chunk_upload = chunk_upload
self.chunk_upload_size = chunk_upload_size
self.error_dump = error_dump
self.sync = sync
def _setup_dest_client(self, client: Client) -> Client:
for action in ["upload", "move"]:
if action not in client.http_header:
client.http_header[action] = []
client.http_header[action].append("X-OC-MTIME: XXXX")
return client
def start(self, src_path: str, dest_path: str):
"""
Start the workers with the first explore task at the root
:src_path: Root path for source
:dest_path: Root path for dest
"""
self.queue.put((self.explore, (src_path, dest_path)))
return self.start_workers()
def retry(self, file: Path):
"""
Start the workers with the errors tasks
:file: JSON file of errors
"""
with file.open("r") as datas:
errors = load(datas)
for err in errors:
if "task" not in err or "args" not in err:
continue
task = getattr(self, err["task"])
self.queue.put((task, err["args"]))
return self.start_workers()
def start_workers(self):
"""
Start the workers
"""
interrupt = False
workers = []
for number in range(self.workers):
worker = Worker(number, self.queue, self.progress, self.errors_lock,
self.tasks_errors, daemon=True)
workers.append(worker)
worker.start()
try:
self.queue.join()
except KeyboardInterrupt:
interrupt = True
self.queue.queue.clear()
for worker in workers:
worker.stop()
for worker in workers:
worker.join()
return not interrupt
def report(self):
"""
Report stats and errors
"""
print(f"{self.stats['file']-self.stats['skipped_file']}/{self.stats['file']} files and " \
f"{self.stats['directorie']} directories proceed " \
f"for {len(self.tasks_errors)} errors occurred")
with self.error_dump.open("w") as file:
dump(self.tasks_errors, file)
def progress(self, number: int, msg: str):
"""
Track workers progress and print it to the user
This method is invoked inside workers
:param number: The worker ID
:param msg: The worker state
"""
with self.progress_lock:
self.progress_list[number] = msg
stdout.flush()
out = "\x1b[2J\x1b[H"
for p_n, p_m in self.progress_list.items():
out += f"Worker {p_n} | {p_m}\n"
out += f"{self.stats['file']-self.stats['skipped_file']}/{self.stats['file']} " \
f"files\t{self.stats['directorie']} directories\t{len(self.tasks_errors)} errors\n"
stdout.write("\r"+out)
stdout.flush()
def explore(self, src_path: str, dest_path: str):
"""
Explore a WebDav folder by listing its files and add tasks to queue to explore sub folders
or copy said files
:param src_path: Root path for source
:param dest_path: Root path for dest
"""
self.dest_client.mkdir(dest_path)
for file in self.src_client.list(src_path)[1:]:
new_src_path = src_path + file
new_dest_path = dest_path + file
task = None
if self.src_client.is_dir(new_src_path):
with self.stats_lock:
self.stats["directorie"] += 1
task = self.explore
else:
task = self.copy
self.queue.put((task, (new_src_path, new_dest_path)))
def set_dest_mtime(self, action: str, src_path):
"""
Get the modification time from the source and add the correct header to the dest client
:param action: WebDav action (upload, move, etc...)
:param src_path: File path for source
"""
modified = self.src_client.info(src_path)["modified"]
mtime = datetime.strptime(modified, "%a, %d %b %Y %H:%M:%S %Z")\
.replace(tzinfo=timezone.utc).timestamp()
self.dest_client.http_header[action][-1] = f"X-OC-MTIME: {int(mtime)}"
def copy(self, src_path: str, dest_path: str):
"""
Copy a file from source to dest using RAM buffer is small enough
otherwise creating a temp file buffer
:param src_path: File path for source
:param dest_path: File path for dest
"""
src_info = self.src_client.info(src_path)
src_size_mb = int(src_info["size"])/1049000 # MB
if self.sync and self.dest_client.check(dest_path):
dest_info = self.dest_client.info(dest_path)
if src_info["size"] == dest_info["size"] and \
src_info["modified"] == dest_info["modified"]:
with self.stats_lock:
self.stats["skipped_file"] += 1
self.stats["file"] += 1
return
with self.stats_lock:
self.stats["file"] += 1
if src_size_mb == 0: # Manage empty file
self.set_dest_mtime("upload", src_path)
self.dest_client.upload_to(None, dest_path)
return
if self.chunk_upload and src_size_mb > self.chunk_upload: # Manage big file
self.copy_chunked(src_path, dest_path)
return
if src_size_mb > self.memory_limit: # Avoid RAM with large file
buffer = TemporaryFile()
else:
buffer = BytesIO()
with buffer as buff:
self.src_client.download_from(buff, src_path)
buff.seek(0)
self.set_dest_mtime("upload", src_path)
self.dest_client.upload_to(buff, dest_path)
def copy_chunked(self, src_path: str, dest_path: str):
"""
Uploads a file using chunks. If the file is smaller than
``chunk_size`` it will be uploaded directly.
"""
chunk_size = self.chunk_upload_size * 1024 * 1024
chunk_folder = f"/uploads/{self.dest_client.webdav.options['login']}" \
f"/webdav_migration-{uuid4()}"
self.dest_client.mkdir(chunk_folder)
with TemporaryFile() as buff:
self.src_client.download_from(buff, src_path)
buff.seek(0)
size = buff.seek(0, SEEK_END)
buff.seek(0)
chunk_count = ceil(float(size) / float(chunk_size))
self.set_dest_mtime("upload", src_path)
for chunk_index in range(chunk_count):
with BytesIO() as chunk_buff:
c_z = chunk_buff.write(buff.read(chunk_size))
chunk_buff.seek(0)
chunk_name = f"{chunk_index*chunk_size:015d}-" \
f"{(chunk_index*chunk_size)+c_z:015d}"
self.dest_client.upload_to(chunk_buff, f"{chunk_folder}/{chunk_name}")
self.set_dest_mtime("move", src_path)
self.dest_client.move(f"{chunk_folder}/.file", dest_path, True)
@staticmethod
def check_client(client: Client, path: str):
"""
Check WebDav client configuration
:param client: WebDav client
:param path: WebDav root path
:return: True if the configuration is valid otherwise an error message
"""
if not client.valid():
return "Invalide configuration"
try:
if not client.check(path):
return "Path doesn't exist"
except ConnectionException as exception:
return exception.exception
return True
def main():
"""
When the file is executed, parse arguments and start a migration
"""
parser = ArgumentParser(prog="webdav_migration",
description="Transfer files between WebDavs",
formatter_class=ArgumentDefaultsHelpFormatter,
usage="python webdav_migration.py " \
"-s 'http://nextcloud.test/remote.php/dav' " \
"-u 'username' " \
"-w 'password' " \
"-p '/files/username/source folder/' " \
"-S 'http://other_nextcloud.test/remote.php/dav' " \
"-U 'another_username' " \
"-W 'another_password' " \
"-P '/files/another_username/dest folder/'")
parser.add_argument("--src-host", "-s", required=True,
help="Source URL to the WebDav endpoint " \
"(example: https://nextcloud.test/remote.php/dav)")
parser.add_argument("--src-user", "-u", required=True,
help="Source username for WebDav connection")
parser.add_argument("--src-passwd", "-w", required=True,
help="Source password for WebDav connection")
parser.add_argument("--src-path", "-p", required=True,
help="Source root path for WebDav connection " \
"(example: /files/my_username/source folder/)")
parser.add_argument("--dest-host", "-S", required=True,
help="Dest URL to the WebDav endpoint " \
"(example: https://nextcloud.test/remote.php/dav)")
parser.add_argument("--dest-user", "-U", required=True,
help="Dest username for WebDav connection")
parser.add_argument("--dest-passwd", "-W", required=True,
help="Dest password for WebDav connection")
parser.add_argument("--dest-path", "-P", required=True,
help="Dest root path for WebDav connection " \
"(example: /files/other_nextcloud/dest folder/)")
parser.add_argument("--workers", type=int, default=4,
help="Number of concurrent threads")
parser.add_argument("--memory-limit", type=int, default=100,
help="The file size limit (MB) before using a file " \
"instead of RAM as buffer for upload/download")
parser.add_argument("--chunk-doxnload-size", type=int, default=65536,
help="Chunk size of downloaded file")
parser.add_argument("--chunk-upload", type=int, default=100,
help="MB to trigger chunked upload, set to 0 to disable")
parser.add_argument("--chunk-upload-size", type=int, default=10,
help="MB of each uploaded chunk")
parser.add_argument("--timeout", type=int, default=600,
help="The WebDav requests timeout in seconds")
parser.add_argument("--error-dump", default="/tmp/webdav_migration_errors.json",
help="The WebDav errors dump")
parser.add_argument("--no-sync", default=True, action='store_false', dest="sync",
help="Avoid copying of existing file with same size and modification date")
parser.add_argument("--retry", default=None, help="Retry from errors stored in a JSON file")
try:
args = parser.parse_args()
src_client = Client({
"webdav_hostname": args.src_host,
"webdav_login": args.src_user,
"webdav_password": args.src_passwd,
"webdav_timeout": args.timeout,
"chunk_size": args.chunk_doxnload_size
})
check = Migration.check_client(src_client, args.src_path)
if check is not True:
raise ArgumentTypeError(f"Source client error : {check}")
dest_client = Client({
"webdav_hostname": args.dest_host,
"webdav_login": args.dest_user,
"webdav_password": args.dest_passwd,
"webdav_timeout": args.timeout,
"chunk_size": args.chunk_doxnload_size
})
check = Migration.check_client(dest_client, args.dest_path)
if check is not True:
raise ArgumentTypeError(f"Dest client error : {check}")
error_dump = Path(args.error_dump)
if (error_dump.exists() and (not error_dump.is_file() or not access(error_dump, W_OK))) \
or not access(error_dump.parent, W_OK):
raise ArgumentTypeError("Errors dump file path invalid")
if args.retry:
retry_file = Path(args.retry)
if not retry_file.exists() or not retry_file.is_file() or not access(retry_file, W_OK):
raise ArgumentTypeError("Retry file path invalid")
if error_dump ==retry_file:
raise ArgumentTypeError("Errors dump and retry file is the same")
except ArgumentTypeError as exception:
print(exception)
sys_exit(1)
migration = Migration(src_client, dest_client, args.workers, args.memory_limit,
args.chunk_upload, args.chunk_upload_size, error_dump, args.sync)
if not args.retry:
not_interrupt = migration.start(args.src_path, args.dest_path)
else:
not_interrupt = migration.retry(retry_file)
migration.report()
if not not_interrupt:
print("Process interrupted")
sys_exit(2)
if __name__ == "__main__":
main()