rpki/scripts/local_repo_replay/prepare_tals.py

85 lines
2.6 KiB
Python
Executable File

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
from pathlib import Path
def split_tal(text: str) -> tuple[list[str], list[str]]:
lines = text.splitlines()
for index, line in enumerate(lines):
if line.strip() == "":
return lines[:index], lines[index + 1 :]
return lines, []
def rsync_only_tal(source: Path) -> tuple[str, dict[str, object]]:
text = source.read_text(encoding="utf-8", errors="replace")
uri_lines, key_lines = split_tal(text)
uris = [
line.strip()
for line in uri_lines
if line.strip() and not line.lstrip().startswith("#")
]
rsync_uris = [
uri for uri in uris
if uri.lower().startswith("rsync://")
]
if not rsync_uris:
return text if text.endswith("\n") else text + "\n", {
"file": source.name,
"mode": "unchanged_no_rsync_uri",
"uris": len(uris),
"rsyncUris": 0,
}
out = "\n".join(rsync_uris) + "\n\n" + "\n".join(key_lines).strip() + "\n"
return out, {
"file": source.name,
"mode": "rsync_only",
"uris": len(uris),
"rsyncUris": len(rsync_uris),
}
def collect_tals(tal_dir: Path | None, tals: list[Path]) -> list[Path]:
paths: list[Path] = []
if tal_dir is not None:
paths.extend(sorted(tal_dir.glob("*.tal")))
paths.extend(tals)
unique: dict[str, Path] = {}
for path in paths:
unique[path.name] = path
return [unique[name] for name in sorted(unique)]
def main() -> int:
parser = argparse.ArgumentParser(description="Prepare TALs for local rsync-tree replay.")
parser.add_argument("--tal-dir", type=Path)
parser.add_argument("--tal", type=Path, action="append", default=[])
parser.add_argument("--out-dir", type=Path, required=True)
parser.add_argument("--summary", type=Path)
args = parser.parse_args()
sources = collect_tals(args.tal_dir, args.tal)
if not sources:
raise SystemExit("no TAL files provided")
args.out_dir.mkdir(parents=True, exist_ok=True)
rows = []
for source in sources:
if not source.is_file():
raise SystemExit(f"TAL file not found: {source}")
content, row = rsync_only_tal(source)
(args.out_dir / source.name).write_text(content, encoding="utf-8")
rows.append(row)
if args.summary is not None:
args.summary.parent.mkdir(parents=True, exist_ok=True)
args.summary.write_text(json.dumps(rows, indent=2, sort_keys=True) + "\n", encoding="utf-8")
return 0
if __name__ == "__main__":
raise SystemExit(main())