137 lines
3.9 KiB
Python
Executable File
137 lines
3.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import argparse
|
|
import errno
|
|
import os
|
|
import shutil
|
|
from pathlib import Path
|
|
|
|
|
|
def _same_inode(src: Path, dst: Path) -> bool:
|
|
try:
|
|
src_stat = src.stat()
|
|
dst_stat = dst.stat()
|
|
except FileNotFoundError:
|
|
return False
|
|
return (src_stat.st_dev, src_stat.st_ino) == (dst_stat.st_dev, dst_stat.st_ino)
|
|
|
|
|
|
def _remove_path(path: Path) -> None:
|
|
if not path.exists() and not path.is_symlink():
|
|
return
|
|
if path.is_dir() and not path.is_symlink():
|
|
shutil.rmtree(path)
|
|
else:
|
|
path.unlink()
|
|
|
|
|
|
def _prune_empty_dirs(root: Path) -> None:
|
|
if not root.exists():
|
|
return
|
|
for path in sorted((p for p in root.rglob("*") if p.is_dir()), key=lambda p: len(p.parts), reverse=True):
|
|
try:
|
|
path.rmdir()
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def _link_or_copy(src: Path, dst: Path) -> str:
|
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
|
if dst.exists() or dst.is_symlink():
|
|
if _same_inode(src, dst):
|
|
return "reused"
|
|
_remove_path(dst)
|
|
try:
|
|
os.link(src, dst)
|
|
return "linked"
|
|
except OSError as err:
|
|
if err.errno not in (errno.EXDEV, errno.EPERM, errno.EMLINK, errno.ENOTSUP, errno.EACCES):
|
|
raise
|
|
shutil.copy2(src, dst)
|
|
return "copied"
|
|
|
|
|
|
def _file_map(src_arg: str, dest_arg: str) -> tuple[Path, dict[str, Path]]:
|
|
src = Path(src_arg.rstrip(os.sep))
|
|
if not src.exists():
|
|
raise FileNotFoundError(src)
|
|
mapping: dict[str, Path] = {}
|
|
if src.is_dir():
|
|
copy_contents = src_arg.endswith(os.sep)
|
|
if copy_contents:
|
|
root = src
|
|
for path in root.rglob("*"):
|
|
if path.is_file():
|
|
mapping[path.relative_to(root).as_posix()] = path
|
|
else:
|
|
root = src
|
|
base = src.name
|
|
for path in root.rglob("*"):
|
|
if path.is_file():
|
|
rel = Path(base) / path.relative_to(root)
|
|
mapping[rel.as_posix()] = path
|
|
else:
|
|
dest_path = Path(dest_arg)
|
|
if dest_arg.endswith(os.sep) or dest_path.is_dir():
|
|
mapping[src.name] = src
|
|
else:
|
|
mapping[dest_path.name] = src
|
|
return Path(dest_arg), mapping
|
|
|
|
|
|
def sync_local_tree(src_arg: str, dst_arg: str, delete: bool) -> dict[str, int]:
|
|
dst_root, mapping = _file_map(src_arg, dst_arg)
|
|
dst_root.mkdir(parents=True, exist_ok=True)
|
|
|
|
expected = {dst_root / rel for rel in mapping.keys()}
|
|
|
|
deleted = 0
|
|
if delete and dst_root.exists():
|
|
for path in sorted(dst_root.rglob("*"), key=lambda p: len(p.parts), reverse=True):
|
|
if path.is_dir():
|
|
continue
|
|
if path not in expected:
|
|
_remove_path(path)
|
|
deleted += 1
|
|
_prune_empty_dirs(dst_root)
|
|
|
|
linked = 0
|
|
copied = 0
|
|
reused = 0
|
|
for rel, src in mapping.items():
|
|
dst = dst_root / rel
|
|
result = _link_or_copy(src, dst)
|
|
if result == "linked":
|
|
linked += 1
|
|
elif result == "copied":
|
|
copied += 1
|
|
else:
|
|
reused += 1
|
|
|
|
return {
|
|
"files": len(mapping),
|
|
"linked": linked,
|
|
"copied": copied,
|
|
"reused": reused,
|
|
"deleted": deleted,
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Sync a local CIR mirror tree using hardlinks when possible.")
|
|
parser.add_argument("--delete", action="store_true", help="Delete target files not present in source")
|
|
parser.add_argument("source")
|
|
parser.add_argument("dest")
|
|
args = parser.parse_args()
|
|
|
|
summary = sync_local_tree(args.source, args.dest, args.delete)
|
|
print(
|
|
"local-link-sync files={files} linked={linked} copied={copied} reused={reused} deleted={deleted}".format(
|
|
**summary
|
|
)
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|