Fix some python lint errors

master
Florian Charlaix 1 year ago
parent 280302551d
commit 258556fdc8
  1. 63
      scripts/groups_export.py
  2. 73
      scripts/groups_import.py
  3. 4
      scripts/user_data_migrate.py
  4. 35
      scripts/users_groups_quotas_export.py
  5. 13
      scripts/webdav_migration.py
  6. 0
      scripts/webdav_migration_requirements.txt

@ -7,31 +7,35 @@ from re import compile as re_compile
def main(path : str, php :str, output_groups : str, output_group_folders : str, exclude_group: str):
output_groups = Path(output_groups)
if output_group_folders:
output_group_folders = Path(output_group_folders)
if exclude_group:
exclude_group = re_compile(exclude_group)
r = run([php, "occ", "-V"], cwd=path, check=True, stdout=PIPE, stderr=PIPE)
version = list(map(int, r.stdout.decode().split(" ")[-1].split(".")))
r = run([php, "occ", "config:list", "--private", "--output", "json"], cwd=path, check=True, stdout=PIPE, stderr=PIPE)
config = loads(r.stdout.decode().strip())
out = run([php, "occ", "-V"], cwd=path, check=True, stdout=PIPE, stderr=PIPE)
version = list(map(int, out.stdout.decode().split(" ")[-1].split(".")))
out = run([php, "occ", "config:list", "--private", "--output", "json"], cwd=path, check=True,
stdout=PIPE, stderr=PIPE)
config = loads(out.stdout.decode().strip())
groups = {}
offset = 0
if version[0] >= 22:
while True:
r = run([php, "occ", "group:list", "--offset", str(offset), "--info", "--output", "json"], cwd=path, check=True, stdout=PIPE, stderr=PIPE)
new_groups = loads(r.stdout.decode().strip())
out = run(
[php, "occ", "group:list", "--offset", str(offset), "--info", "--output", "json"],
cwd=path, check=True, stdout=PIPE, stderr=PIPE)
new_groups = loads(out.stdout.decode().strip())
if not new_groups:
break
for group in new_groups.keys():
r = run([php, "occ", "group:info", group, "--output", "json"], cwd=path, check=True, stdout=PIPE, stderr=PIPE)
infos = loads(r.stdout.decode().strip())
out = run([php, "occ", "group:info", group, "--output", "json"],
cwd=path, check=True, stdout=PIPE, stderr=PIPE)
infos = loads(out.stdout.decode().strip())
new_groups[group]["displayName"] = infos["displayName"]
groups.update(new_groups)
@ -45,32 +49,37 @@ def main(path : str, php :str, output_groups : str, output_group_folders : str,
db_password = config["system"]["dbpassword"]
db_name = config["system"]["dbname"]
db_prefix = config["system"]["dbtableprefix"]
if db_type == "mysql":
r = run(["mysql", "-B", "-r", "--disable-column-names", "-h", db_host, "-P", db_port, "-u", db_user, f"-p{db_password}", "-e", f"select JSON_OBJECT('gid', gid, 'displayname', displayname) from {db_prefix}groups", db_name], cwd=path, check=True, stdout=PIPE, stderr=PIPE)
for l in map(loads, filter(lambda l: l != "", r.stdout.decode().split("\n"))):
groups[l["gid"]] = {"displayName": l["displayname"]}
out = run(["mysql", "-B", "-r", "--disable-column-names", "-h", db_host, "-P", db_port,
"-u", db_user, f"-p{db_password}","-e",
"select JSON_OBJECT('gid', gid, 'displayname', displayname) "\
f"from {db_prefix}groups", db_name],
cwd=path, check=True, stdout=PIPE, stderr=PIPE)
for line in map(loads, filter(lambda l: l != "", out.stdout.decode().split("\n"))):
groups[line["gid"]] = {"displayName": line["displayname"]}
elif db_type == "psql":
pass
elif db_type == "sqlite":
pass
print(f"Got {len(groups)} groups")
if exclude_group:
groups = {k: v for k, v in groups.items() if not exclude_group.findall(k)}
print(f"Got {len(groups)} filtered groups")
with output_groups.open("w") as f:
dump({k: v["displayName"] for (k, v) in groups.items()}, f)
with output_groups.open("w", encoding="UTF-8") as file:
dump({k: v["displayName"] for (k, v) in groups.items()}, file)
if output_group_folders:
r = run([php, "occ", "groupfolders:list", "--output", "json"], cwd=path, check=True, stdout=PIPE, stderr=PIPE)
group_folders = loads(r.stdout.decode().strip())
with output_group_folders.open("w") as f:
dump(group_folders, f)
out = run([php, "occ", "groupfolders:list", "--output", "json"],
cwd=path, check=True, stdout=PIPE, stderr=PIPE)
group_folders = loads(out.stdout.decode().strip())
with output_group_folders.open("w", encoding="UTF-8") as file:
dump(group_folders, file)
if __name__ == '__main__':
@ -82,5 +91,5 @@ if __name__ == '__main__':
parser.add_argument("--exclude-group", "-e", default=None)
args = parser.parse_args()
main(args.path, args.php, args.output_groups, args.output_group_folders, args.exclude_group)

@ -4,7 +4,8 @@ from argparse import ArgumentParser
from pathlib import Path
# From nextcloud/lib/public/Constants.php, based on nextcloud/custom_apps/groupfolders/lib/Command/Group.php
# From nextcloud/lib/public/Constants.php
# Based on nextcloud/custom_apps/groupfolders/lib/Command/Group.php
permission_matrix = {
1: "read",
2 | 4: "write",
@ -15,65 +16,73 @@ permission_matrix = {
def convert_permission_to_text(permission_number) -> [str] :
"""Converts a permission number to text format."""
permissions = []
for permission_bit, permission_string in permission_matrix.items():
if permission_number & permission_bit == permission_bit:
permissions.append(permission_string)
return permissions
"""Converts a permission number to text format."""
permissions = []
for permission_bit, permission_string in permission_matrix.items():
if permission_number & permission_bit == permission_bit:
permissions.append(permission_string)
return permissions
def main(path : str, php :str, groups_file : str, group_folders_file : str):
groups_file = Path(groups_file)
if group_folders_file:
group_folders_file = Path(group_folders_file)
with groups_file.open("r") as f:
groups = load(f)
with groups_file.open("r", encoding="UTF-8") as file:
groups = load(file)
for gid, display_name in groups.items():
if gid == 'admin':
continue
try:
run([php, "occ", "group:add", "--no-interaction", "--display-name", display_name, gid], cwd=path, check=True, stdout=PIPE, stderr=PIPE)
run([php, "occ", "group:add", "--no-interaction", "--display-name", display_name, gid],
cwd=path, check=True, stdout=PIPE, stderr=PIPE)
print(f"Group {gid} created")
except CalledProcessError as e:
if e.returncode == 1:
except CalledProcessError as exception:
if exception.returncode == 1:
print(f"Group {gid} already already exist")
if group_folders_file:
with group_folders_file.open("r") as f:
group_folders = load(f)
r = run([php, "occ", "groupfolders:list", "--output", "json"], cwd=path, check=True, stdout=PIPE, stderr=PIPE)
existing_gf = list(map(lambda gf: gf["mount_point"] , loads(r.stdout.decode().strip())))
with group_folders_file.open("r", encoding="UTF-8") as file:
group_folders = load(file)
out = run([php, "occ", "groupfolders:list", "--output", "json"],
cwd=path, check=True, stdout=PIPE, stderr=PIPE)
existing_gf = list(map(lambda gf: gf["mount_point"] , loads(out.stdout.decode().strip())))
for gf in group_folders:
if gf["mount_point"] in existing_gf:
print(f"Group folder {gf['mount_point']} already exist")
continue
r = run([php, "occ", "groupfolders:create", "--no-interaction", "--output", "json", gf["mount_point"]], cwd=path, check=True, stdout=PIPE, stderr=PIPE)
new_gf = loads(r.stdout.decode().strip())
out = run([php, "occ", "groupfolders:create", "--no-interaction", "--output", "json",
gf["mount_point"]], cwd=path, check=True, stdout=PIPE, stderr=PIPE)
new_gf = loads(out.stdout.decode().strip())
print(f"Group folder {gf['mount_point']} created")
if gf["quota"] > 0:
run([php, "occ", "groupfolders:quota", "--no-interaction", "--output", "json", str(new_gf), str(gf["quota"])], cwd=path, check=True, stdout=PIPE, stderr=PIPE)
run([php, "occ", "groupfolders:quota", "--no-interaction", "--output", "json",
str(new_gf), str(gf["quota"])],
cwd=path, check=True, stdout=PIPE, stderr=PIPE)
print(f"Group folder {gf['mount_point']} quota set to {gf['quota']}")
else:
print(f"Group folder {gf['mount_point']} quota set to unlimited")
if gf["groups"]:
for g_name, g_perms in gf["groups"].items():
perms = convert_permission_to_text(g_perms)
run([php, "occ", "groupfolders:group", "--no-interaction", "--output", "json", str(new_gf), g_name, *perms], cwd=path, check=True, stdout=PIPE, stderr=PIPE)
print(f"Group folder {gf['mount_point']} add group {g_name} with permissions {' '.join(perms)}")
run([php, "occ", "groupfolders:group", "--no-interaction", "--output", "json",
str(new_gf), g_name, *perms],
cwd=path, check=True, stdout=PIPE, stderr=PIPE)
print(f"Group folder {gf['mount_point']} add group {g_name} with permissions "\
f"{' '.join(perms)}")
# if gf["acl"]:
# run([php, "occ", "groupfolders:permissions", "--no-interaction", "--output", "json", "-e", str(new_gf)], cwd=path, check=True, stdout=PIPE, stderr=PIPE)
# print(f"Group folder {gf['mount_point']} now use ACL")
# for manager in gf["manage"]:
# try:
# run([php, "occ", "groupfolders:permissions", "--no-interaction", "--output", "json", "-m", str(new_gf), manager], cwd=path, check=True, stdout=PIPE, stderr=PIPE)
@ -91,5 +100,5 @@ if __name__ == '__main__':
parser.add_argument("--group-folders", "-f", default=None)
args = parser.parse_args()
main(args.path, args.php, args.groups, args.group_folders)

@ -2,7 +2,7 @@ from json import loads
from subprocess import run, PIPE
from argparse import ArgumentParser, ArgumentTypeError
from datetime import datetime, timedelta, timezone
from sys import stdout
from sys import stdout as sys_stdout
def run_command(args: list, remote: str = None, ssh_args: list = None, check: bool = True,
@ -74,7 +74,7 @@ def main(src_path: str, dest_path: str, src_php: str, dest_php: str, src_remote:
for idx, user in enumerate(users):
print(f"{'='*20}[{user} ({idx+1}/{max_users})]{'='*20}")
stdout.flush()
sys_stdout.flush()
run(["rsync", "-avzPh", "--del",
"-e", f"ssh {' '.join(ssh_args)}",

@ -6,23 +6,26 @@ from datetime import datetime, timedelta, timezone
from re import compile as re_compile
def main(path : str, php :str, days: int, output_groups : str, output_quotas : str, exclude_group: str):
def main(path : str, php :str, days: int, output_groups : str, output_quotas : str,
exclude_group: str):
output_groups = Path(output_groups)
output_quotas = Path(output_quotas)
if days != 0:
last_seen_limit = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=days)
last_seen_limit = datetime.now(timezone.utc)\
.replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=days)
else:
last_seen_limit = None
if exclude_group:
exclude_group = re_compile(exclude_group)
users = {}
offset = 0
while True:
r = run([php, "occ", "user:list", "--info", "--offset", str(offset), "--output", "json"], cwd=path, check=True, stdout=PIPE, stderr=PIPE)
new_users = loads(r.stdout.decode().strip())
out = run([php, "occ", "user:list", "--info", "--offset", str(offset), "--output", "json"],
cwd=path, check=True, stdout=PIPE, stderr=PIPE)
new_users = loads(out.stdout.decode().strip())
if not new_users:
break
@ -30,26 +33,28 @@ def main(path : str, php :str, days: int, output_groups : str, output_quotas : s
offset = len(users)
print(f"Got {offset} users")
users = {k: v for k, v in users.items() if v["email"]}
print(f"Got {len(users)} email filtered users")
if last_seen_limit:
users = {k: v for k, v in users.items() if datetime.fromisoformat(v["last_seen"]) >= last_seen_limit}
users = {k: v for k, v in users.items()
if datetime.fromisoformat(v["last_seen"]) >= last_seen_limit}
print(f"Got {len(users)} time filtered users")
if exclude_group:
for user in users:
users[user]["groups"] = list(filter(lambda g: not exclude_group.findall(g), users[user]["groups"]))
users[user]["groups"] = list(filter(lambda g: not exclude_group.findall(g),
users[user]["groups"]))
print("Groups filtered")
with output_groups.open("w") as f:
dump({v["email"]: v["groups"] for (k, v) in users.items()}, f)
with output_groups.open("w", encoding="UTF-8") as file:
dump({v["email"]: v["groups"] for (k, v) in users.items()}, file)
with output_quotas.open("w") as f:
dump({v["email"]: v["quota"] for (k, v) in users.items()}, f)
with output_quotas.open("w", encoding="UTF-8") as file:
dump({v["email"]: v["quota"] for (k, v) in users.items()}, file)
if __name__ == '__main__':
@ -62,5 +67,5 @@ if __name__ == '__main__':
parser.add_argument("--exclude-group", "-e", default=None)
args = parser.parse_args()
main(args.path, args.php, args.days, args.output_groups, args.output_quotas, args.exclude_group)

@ -42,7 +42,7 @@ class Migration:
:workers: Number of maximum workers
:memory_limit: The file size limit (MB) before using a file instead of RAM as buffer
for upload/download
:chunk_upload:
:chunk_upload:
:error_dump: Path to errors dump
"""
self.src_client = src_client
@ -56,7 +56,7 @@ class Migration:
def start(self, src_path: str, dest_path: str):
"""
Start the workers and fill the queue with the first explore task at the root
:src_path: Root path for source
:dest_path: Root path for dest
"""
@ -77,7 +77,7 @@ class Migration:
def worker(self, number: int):
"""
Thread worker consuming queued task
:number: The worker ID
"""
while True:
@ -98,7 +98,7 @@ class Migration:
"""
Track workers progress and print it to the user
This method is invoked inside workers
:param number: The worker ID
:param msg: The worker state
"""
@ -109,7 +109,8 @@ class Migration:
out = "\x1b[2J\x1b[H"
for p_n, p_m in self.progress_list.items():
out += f"Worker {p_n} | {p_m}\n"
out += f"{self.stats['file']} files\t{self.stats['directorie']} directories\t{len(self.tasks_errors)} errors\n"
out += f"{self.stats['file']} files\t{self.stats['directorie']} directories\t"\
"{len(self.tasks_errors)} errors\n"
stdout.write("\r"+out)
stdout.flush()
@ -156,7 +157,7 @@ class Migration:
def copy(self, src_path: str, dest_path: str):
"""
Copy a file from source to dest using RAM buffer is small enough
otherwise creating a temp file buffer
otherwise creating a temp file buffer
:param src_path: File path for source
:param dest_path: File path for dest
Loading…
Cancel
Save