You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
450 lines
16 KiB
450 lines
16 KiB
"""
|
|
A script used to migrate a WebDav folder to another with threading
|
|
"""
|
|
from re import compile as re_compile
|
|
from io import BytesIO
|
|
from tempfile import TemporaryFile
|
|
from argparse import ArgumentParser, ArgumentTypeError, ArgumentDefaultsHelpFormatter
|
|
from threading import Thread, Semaphore
|
|
from queue import Queue, Empty
|
|
from sys import stdout, exit as sys_exit
|
|
from os import access, W_OK, SEEK_END
|
|
from pathlib import Path
|
|
from json import dump, load
|
|
from uuid import uuid4
|
|
from math import ceil
|
|
from datetime import datetime, timezone
|
|
from traceback import format_exc
|
|
from webdav3.client import Client
|
|
from webdav3.exceptions import ConnectionException
|
|
|
|
|
|
r_dir = re_compile(r"^(.*)/$")
|
|
|
|
|
|
class Worker(Thread):
|
|
"""
|
|
Thread worker consuming queued task
|
|
"""
|
|
running : bool = False
|
|
|
|
def __init__(self, number: int, queue: Queue, progress: callable, errors_lock: Semaphore,
|
|
tasks_errors: list, *args, **kwargs):
|
|
"""
|
|
:number: Worker ID
|
|
:queue: Tasks queue
|
|
:progress: Progress callback
|
|
:errors_lock: Error lock
|
|
:tasks_errors: Errors list
|
|
"""
|
|
super().__init__(*args, **kwargs)
|
|
self.number = number
|
|
self.queue = queue
|
|
self.progress = progress
|
|
self.errors_lock = errors_lock
|
|
self.tasks_errors = tasks_errors
|
|
|
|
def run(self):
|
|
"""
|
|
Start the worker
|
|
"""
|
|
self.running = True
|
|
while self.running:
|
|
self.progress(self.number, "Waiting...")
|
|
try:
|
|
task, args = self.queue.get(timeout=1)
|
|
except Empty:
|
|
continue
|
|
|
|
self.progress(self.number, f"{task.__name__} {args}")
|
|
try:
|
|
task(*args)
|
|
except Exception: # pylint: disable=broad-exception-caught
|
|
with self.errors_lock:
|
|
self.tasks_errors.append({"task": task.__name__,
|
|
"args": args,
|
|
"exception": format_exc(),
|
|
"time": datetime.now().isoformat()})
|
|
self.queue.task_done()
|
|
self.progress(self.number, "Stopped")
|
|
|
|
def stop(self):
|
|
"""
|
|
Stop the worker
|
|
"""
|
|
self.running = False
|
|
self.progress(self.number, "Stopping...")
|
|
|
|
|
|
class Migration:
|
|
"""
|
|
A class used to migrate a WebDav folder to another with threading
|
|
"""
|
|
queue = Queue()
|
|
progress_lock = Semaphore()
|
|
progress_list = {}
|
|
tasks_errors = []
|
|
errors_lock = Semaphore()
|
|
stats = {"skipped_file": 0, "file": 0, "directorie": 0}
|
|
stats_lock = Semaphore()
|
|
|
|
def __init__(self, src_client: Client, dest_client: Client, workers: int, memory_limit: int, # pylint: disable=too-many-arguments
|
|
chunk_upload: int, chunk_upload_size: int, error_dump: Path, sync: bool):
|
|
"""
|
|
:src_client: Source WebDav client
|
|
:dest_client: Dest WebDav client
|
|
:workers: Number of maximum workers
|
|
:memory_limit: The file size limit (MB) before using a file instead of RAM as buffer
|
|
for upload/download
|
|
:chunk_upload:
|
|
:error_dump: Path to errors dump
|
|
"""
|
|
self.src_client = src_client
|
|
self.dest_client = self._setup_dest_client(dest_client)
|
|
self.workers = workers
|
|
self.memory_limit = memory_limit
|
|
self.chunk_upload = chunk_upload
|
|
self.chunk_upload_size = chunk_upload_size
|
|
self.error_dump = error_dump
|
|
self.sync = sync
|
|
|
|
def _setup_dest_client(self, client: Client) -> Client:
|
|
for action in ["upload", "move"]:
|
|
if action not in client.http_header:
|
|
client.http_header[action] = []
|
|
client.http_header[action].append("X-OC-MTIME: XXXX")
|
|
return client
|
|
|
|
def start(self, src_path: str, dest_path: str):
|
|
"""
|
|
Start the workers with the first explore task at the root
|
|
|
|
:src_path: Root path for source
|
|
:dest_path: Root path for dest
|
|
"""
|
|
self.queue.put((self.explore, (src_path, dest_path)))
|
|
return self.start_workers()
|
|
|
|
def retry(self, file: Path):
|
|
"""
|
|
Start the workers with the errors tasks
|
|
|
|
:file: JSON file of errors
|
|
"""
|
|
|
|
with file.open("r") as datas:
|
|
errors = load(datas)
|
|
|
|
for err in errors:
|
|
if "task" not in err or "args" not in err:
|
|
continue
|
|
task = getattr(self, err["task"])
|
|
self.queue.put((task, err["args"]))
|
|
|
|
return self.start_workers()
|
|
|
|
def start_workers(self):
|
|
"""
|
|
Start the workers
|
|
"""
|
|
interrupt = False
|
|
workers = []
|
|
|
|
for number in range(self.workers):
|
|
worker = Worker(number, self.queue, self.progress, self.errors_lock,
|
|
self.tasks_errors, daemon=True)
|
|
workers.append(worker)
|
|
worker.start()
|
|
|
|
try:
|
|
self.queue.join()
|
|
except KeyboardInterrupt:
|
|
interrupt = True
|
|
|
|
self.queue.queue.clear()
|
|
|
|
for worker in workers:
|
|
worker.stop()
|
|
|
|
for worker in workers:
|
|
worker.join()
|
|
|
|
return not interrupt
|
|
|
|
def report(self):
|
|
"""
|
|
Report stats and errors
|
|
"""
|
|
print(f"{self.stats['file']-self.stats['skipped_file']}/{self.stats['file']} files and " \
|
|
f"{self.stats['directorie']} directories proceed " \
|
|
f"for {len(self.tasks_errors)} errors occurred")
|
|
with self.error_dump.open("w") as file:
|
|
dump(self.tasks_errors, file)
|
|
|
|
def progress(self, number: int, msg: str):
|
|
"""
|
|
Track workers progress and print it to the user
|
|
This method is invoked inside workers
|
|
|
|
:param number: The worker ID
|
|
:param msg: The worker state
|
|
"""
|
|
with self.progress_lock:
|
|
self.progress_list[number] = msg
|
|
|
|
stdout.flush()
|
|
out = "\x1b[2J\x1b[H"
|
|
for p_n, p_m in self.progress_list.items():
|
|
out += f"Worker {p_n} | {p_m}\n"
|
|
out += f"{self.stats['file']-self.stats['skipped_file']}/{self.stats['file']} " \
|
|
f"files\t{self.stats['directorie']} directories\t{len(self.tasks_errors)} errors\n"
|
|
|
|
stdout.write("\r"+out)
|
|
stdout.flush()
|
|
|
|
|
|
def explore(self, src_path: str, dest_path: str):
|
|
"""
|
|
Explore a WebDav folder by listing its files and add tasks to queue to explore sub folders
|
|
or copy said files
|
|
|
|
:param src_path: Root path for source
|
|
:param dest_path: Root path for dest
|
|
"""
|
|
self.dest_client.mkdir(dest_path)
|
|
for file in self.src_client.list(src_path)[1:]:
|
|
new_src_path = src_path + file
|
|
new_dest_path = dest_path + file
|
|
task = None
|
|
if self.src_client.is_dir(new_src_path):
|
|
with self.stats_lock:
|
|
self.stats["directorie"] += 1
|
|
task = self.explore
|
|
else:
|
|
task = self.copy
|
|
self.queue.put((task, (new_src_path, new_dest_path)))
|
|
|
|
def set_dest_mtime(self, action: str, src_path):
|
|
"""
|
|
Get the modification time from the source and add the correct header to the dest client
|
|
|
|
:param action: WebDav action (upload, move, etc...)
|
|
:param src_path: File path for source
|
|
"""
|
|
modified = self.src_client.info(src_path)["modified"]
|
|
mtime = datetime.strptime(modified, "%a, %d %b %Y %H:%M:%S %Z")\
|
|
.replace(tzinfo=timezone.utc).timestamp()
|
|
self.dest_client.http_header[action][-1] = f"X-OC-MTIME: {int(mtime)}"
|
|
|
|
def copy(self, src_path: str, dest_path: str):
|
|
"""
|
|
Copy a file from source to dest using RAM buffer is small enough
|
|
otherwise creating a temp file buffer
|
|
|
|
:param src_path: File path for source
|
|
:param dest_path: File path for dest
|
|
"""
|
|
src_info = self.src_client.info(src_path)
|
|
src_size_mb = int(src_info["size"])/1049000 # MB
|
|
|
|
if self.sync and self.dest_client.check(dest_path):
|
|
dest_info = self.dest_client.info(dest_path)
|
|
if src_info["size"] == dest_info["size"] and \
|
|
src_info["modified"] == dest_info["modified"]:
|
|
with self.stats_lock:
|
|
self.stats["skipped_file"] += 1
|
|
self.stats["file"] += 1
|
|
return
|
|
|
|
with self.stats_lock:
|
|
self.stats["file"] += 1
|
|
|
|
|
|
if src_size_mb == 0: # Manage empty file
|
|
self.set_dest_mtime("upload", src_path)
|
|
self.dest_client.upload_to(None, dest_path)
|
|
return
|
|
|
|
if self.chunk_upload and src_size_mb > self.chunk_upload: # Manage big file
|
|
self.copy_chunked(src_path, dest_path)
|
|
return
|
|
|
|
if src_size_mb > self.memory_limit: # Avoid RAM with large file
|
|
buffer = TemporaryFile()
|
|
else:
|
|
buffer = BytesIO()
|
|
|
|
with buffer as buff:
|
|
self.src_client.download_from(buff, src_path)
|
|
buff.seek(0)
|
|
self.set_dest_mtime("upload", src_path)
|
|
self.dest_client.upload_to(buff, dest_path)
|
|
|
|
def copy_chunked(self, src_path: str, dest_path: str):
|
|
"""
|
|
Uploads a file using chunks. If the file is smaller than
|
|
``chunk_size`` it will be uploaded directly.
|
|
|
|
"""
|
|
chunk_size = self.chunk_upload_size * 1024 * 1024
|
|
|
|
chunk_folder = f"/uploads/{self.dest_client.webdav.options['login']}" \
|
|
f"/webdav_migration-{uuid4()}"
|
|
self.dest_client.mkdir(chunk_folder)
|
|
|
|
with TemporaryFile() as buff:
|
|
self.src_client.download_from(buff, src_path)
|
|
buff.seek(0)
|
|
size = buff.seek(0, SEEK_END)
|
|
buff.seek(0)
|
|
|
|
chunk_count = ceil(float(size) / float(chunk_size))
|
|
|
|
self.set_dest_mtime("upload", src_path)
|
|
|
|
for chunk_index in range(chunk_count):
|
|
with BytesIO() as chunk_buff:
|
|
c_z = chunk_buff.write(buff.read(chunk_size))
|
|
chunk_buff.seek(0)
|
|
chunk_name = f"{chunk_index*chunk_size:015d}-" \
|
|
f"{(chunk_index*chunk_size)+c_z:015d}"
|
|
self.dest_client.upload_to(chunk_buff, f"{chunk_folder}/{chunk_name}")
|
|
|
|
self.set_dest_mtime("move", src_path)
|
|
self.dest_client.move(f"{chunk_folder}/.file", dest_path, True)
|
|
|
|
@staticmethod
|
|
def check_client(client: Client, path: str):
|
|
"""
|
|
Check WebDav client configuration
|
|
|
|
:param client: WebDav client
|
|
:param path: WebDav root path
|
|
:return: True if the configuration is valid otherwise an error message
|
|
"""
|
|
if not client.valid():
|
|
return "Invalide configuration"
|
|
|
|
try:
|
|
if not client.check(path):
|
|
return "Path doesn't exist"
|
|
except ConnectionException as exception:
|
|
return exception.exception
|
|
|
|
return True
|
|
|
|
|
|
def main():
|
|
"""
|
|
When the file is executed, parse arguments and start a migration
|
|
"""
|
|
parser = ArgumentParser(prog="webdav_migration",
|
|
description="Transfer files between WebDavs",
|
|
formatter_class=ArgumentDefaultsHelpFormatter,
|
|
usage="python webdav_migration.py " \
|
|
"-s 'http://nextcloud.test/remote.php/dav' " \
|
|
"-u 'username' " \
|
|
"-w 'password' " \
|
|
"-p '/files/username/source folder/' " \
|
|
"-S 'http://other_nextcloud.test/remote.php/dav' " \
|
|
"-U 'another_username' " \
|
|
"-W 'another_password' " \
|
|
"-P '/files/another_username/dest folder/'")
|
|
parser.add_argument("--src-host", "-s", required=True,
|
|
help="Source URL to the WebDav endpoint " \
|
|
"(example: https://nextcloud.test/remote.php/dav)")
|
|
parser.add_argument("--src-user", "-u", required=True,
|
|
help="Source username for WebDav connection")
|
|
parser.add_argument("--src-passwd", "-w", required=True,
|
|
help="Source password for WebDav connection")
|
|
parser.add_argument("--src-path", "-p", required=True,
|
|
help="Source root path for WebDav connection " \
|
|
"(example: /files/my_username/source folder/)")
|
|
parser.add_argument("--dest-host", "-S", required=True,
|
|
help="Dest URL to the WebDav endpoint " \
|
|
"(example: https://nextcloud.test/remote.php/dav)")
|
|
parser.add_argument("--dest-user", "-U", required=True,
|
|
help="Dest username for WebDav connection")
|
|
parser.add_argument("--dest-passwd", "-W", required=True,
|
|
help="Dest password for WebDav connection")
|
|
parser.add_argument("--dest-path", "-P", required=True,
|
|
help="Dest root path for WebDav connection " \
|
|
"(example: /files/other_nextcloud/dest folder/)")
|
|
parser.add_argument("--workers", type=int, default=4,
|
|
help="Number of concurrent threads")
|
|
parser.add_argument("--memory-limit", type=int, default=100,
|
|
help="The file size limit (MB) before using a file " \
|
|
"instead of RAM as buffer for upload/download")
|
|
parser.add_argument("--chunk-doxnload-size", type=int, default=65536,
|
|
help="Chunk size of downloaded file")
|
|
parser.add_argument("--chunk-upload", type=int, default=100,
|
|
help="MB to trigger chunked upload, set to 0 to disable")
|
|
parser.add_argument("--chunk-upload-size", type=int, default=10,
|
|
help="MB of each uploaded chunk")
|
|
parser.add_argument("--timeout", type=int, default=600,
|
|
help="The WebDav requests timeout in seconds")
|
|
parser.add_argument("--error-dump", default="/tmp/webdav_migration_errors.json",
|
|
help="The WebDav errors dump")
|
|
parser.add_argument("--no-sync", default=True, action='store_false', dest="sync",
|
|
help="Avoid copying of existing file with same size and modification date")
|
|
parser.add_argument("--retry", default=None, help="Retry from errors stored in a JSON file")
|
|
|
|
try:
|
|
args = parser.parse_args()
|
|
|
|
src_client = Client({
|
|
"webdav_hostname": args.src_host,
|
|
"webdav_login": args.src_user,
|
|
"webdav_password": args.src_passwd,
|
|
"webdav_timeout": args.timeout,
|
|
"chunk_size": args.chunk_doxnload_size
|
|
})
|
|
|
|
check = Migration.check_client(src_client, args.src_path)
|
|
if check is not True:
|
|
raise ArgumentTypeError(f"Source client error : {check}")
|
|
|
|
dest_client = Client({
|
|
"webdav_hostname": args.dest_host,
|
|
"webdav_login": args.dest_user,
|
|
"webdav_password": args.dest_passwd,
|
|
"webdav_timeout": args.timeout,
|
|
"chunk_size": args.chunk_doxnload_size
|
|
})
|
|
|
|
check = Migration.check_client(dest_client, args.dest_path)
|
|
if check is not True:
|
|
raise ArgumentTypeError(f"Dest client error : {check}")
|
|
|
|
error_dump = Path(args.error_dump)
|
|
if (error_dump.exists() and (not error_dump.is_file() or not access(error_dump, W_OK))) \
|
|
or not access(error_dump.parent, W_OK):
|
|
raise ArgumentTypeError("Errors dump file path invalid")
|
|
|
|
if args.retry:
|
|
retry_file = Path(args.retry)
|
|
if not retry_file.exists() or not retry_file.is_file() or not access(retry_file, W_OK):
|
|
raise ArgumentTypeError("Retry file path invalid")
|
|
if error_dump ==retry_file:
|
|
raise ArgumentTypeError("Errors dump and retry file is the same")
|
|
|
|
except ArgumentTypeError as exception:
|
|
print(exception)
|
|
sys_exit(1)
|
|
|
|
migration = Migration(src_client, dest_client, args.workers, args.memory_limit,
|
|
args.chunk_upload, args.chunk_upload_size, error_dump, args.sync)
|
|
if not args.retry:
|
|
not_interrupt = migration.start(args.src_path, args.dest_path)
|
|
else:
|
|
not_interrupt = migration.retry(retry_file)
|
|
|
|
migration.report()
|
|
|
|
if not not_interrupt:
|
|
print("Process interrupted")
|
|
sys_exit(2)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|