#!/usr/bin/env python3 from __future__ import annotations import argparse import json import shlex import sys from pathlib import Path RIR_CONFIG = { "afrinic": { "tal": "tests/fixtures/tal/afrinic.tal", "ta": "tests/fixtures/ta/afrinic-ta.cer", "trust_anchor": "afrinic", }, "apnic": { "tal": "tests/fixtures/tal/apnic-rfc7730-https.tal", "ta": "tests/fixtures/ta/apnic-ta.cer", "trust_anchor": "apnic", }, "arin": { "tal": "tests/fixtures/tal/arin.tal", "ta": "tests/fixtures/ta/arin-ta.cer", "trust_anchor": "arin", }, "lacnic": { "tal": "tests/fixtures/tal/lacnic.tal", "ta": "tests/fixtures/ta/lacnic-ta.cer", "trust_anchor": "lacnic", }, "ripe": { "tal": "tests/fixtures/tal/ripe-ncc.tal", "ta": "tests/fixtures/ta/ripe-ncc-ta.cer", "trust_anchor": "ripe", }, } def default_repo_root() -> Path: return Path(__file__).resolve().parents[2] def default_bundle_root(repo_root: Path) -> Path: return (repo_root / "../../rpki/target/live/20260316-112341-multi-final3").resolve() def require_path(path: Path, kind: str) -> Path: if kind == "dir" and not path.is_dir(): raise SystemExit(f"missing directory: {path}") if kind == "file" and not path.is_file(): raise SystemExit(f"missing file: {path}") return path def load_timing_summary(bundle_root: Path) -> dict: timing_path = require_path(bundle_root / "timing-summary.json", "file") return json.loads(timing_path.read_text(encoding="utf-8")) def load_json(path: Path) -> dict: return json.loads(require_path(path, "file").read_text(encoding="utf-8")) def lock_validation_time(lock_obj: dict, fallback_started_at: str) -> str: return lock_obj.get("validationTime") or lock_obj.get("validation_time") or fallback_started_at def build_case(bundle_root: Path, repo_root: Path, rir: str) -> dict: if rir not in RIR_CONFIG: raise SystemExit( f"unsupported rir: {rir}; expected one of: {', '.join(sorted(RIR_CONFIG))}" ) rir_root = require_path(bundle_root / rir, "dir") cfg = RIR_CONFIG[rir] timing_summary = load_timing_summary(bundle_root) if rir not in timing_summary: raise SystemExit(f"timing-summary.json missing entry for rir: {rir}") timing_entry = timing_summary[rir] durations = timing_entry.get("durations") or {} base_timing = require_path(rir_root / "timings" / "base-replay.json", "file") delta_timing = require_path(rir_root / "timings" / "delta-replay.json", "file") base_timing_obj = json.loads(base_timing.read_text(encoding="utf-8")) delta_timing_obj = json.loads(delta_timing.read_text(encoding="utf-8")) base_locks_obj = load_json(rir_root / "base-locks.json") delta_locks_obj = load_json(rir_root / "locks-delta.json") case = { "bundle_root": str(bundle_root), "repo_root": str(repo_root), "rir": rir, "trust_anchor": cfg["trust_anchor"], "rir_root": str(rir_root), "base_archive": str(require_path(rir_root / "base-payload-archive", "dir")), "base_locks": str(require_path(rir_root / "base-locks.json", "file")), "base_vrps_csv": str(require_path(rir_root / "base-vrps.csv", "file")), "delta_archive": str(require_path(rir_root / "payload-delta-archive", "dir")), "delta_locks": str(require_path(rir_root / "locks-delta.json", "file")), "delta_record_csv": str(require_path(rir_root / "record-delta.csv", "file")), "replay_delta_csv": str(require_path(rir_root / "replay-delta.csv", "file")), "verification_json": str(require_path(rir_root / "verification.json", "file")), "readme": str(require_path(rir_root / "README.md", "file")), "timings_dir": str(require_path(rir_root / "timings", "dir")), "base_timing_json": str(base_timing), "delta_timing_json": str(delta_timing), "tal_path": str(require_path(repo_root / cfg["tal"], "file")), "ta_path": str(require_path(repo_root / cfg["ta"], "file")), "validation_times": { "snapshot": lock_validation_time(base_locks_obj, base_timing_obj["startedAt"]), "delta": lock_validation_time(delta_locks_obj, delta_timing_obj["startedAt"]), }, "timing_started_at": { "snapshot_replay": base_timing_obj["startedAt"], "delta_replay": delta_timing_obj["startedAt"], }, "routinator_timings": { "base_replay_seconds": float(durations["base-replay"]), "delta_replay_seconds": float(durations["delta-replay"]), }, } return case def emit_env(case: dict) -> str: ordered = { "BUNDLE_ROOT": case["bundle_root"], "RIR": case["rir"], "TRUST_ANCHOR": case["trust_anchor"], "RIR_ROOT": case["rir_root"], "TAL_PATH": case["tal_path"], "TA_PATH": case["ta_path"], "PAYLOAD_REPLAY_ARCHIVE": case["base_archive"], "PAYLOAD_REPLAY_LOCKS": case["base_locks"], "ROUTINATOR_BASE_RECORD_CSV": case["base_vrps_csv"], "PAYLOAD_BASE_ARCHIVE": case["base_archive"], "PAYLOAD_BASE_LOCKS": case["base_locks"], "PAYLOAD_DELTA_ARCHIVE": case["delta_archive"], "PAYLOAD_DELTA_LOCKS": case["delta_locks"], "ROUTINATOR_DELTA_RECORD_CSV": case["delta_record_csv"], "SNAPSHOT_VALIDATION_TIME": case["validation_times"]["snapshot"], "DELTA_VALIDATION_TIME": case["validation_times"]["delta"], "ROUTINATOR_BASE_REPLAY_SECONDS": str(case["routinator_timings"]["base_replay_seconds"]), "ROUTINATOR_DELTA_REPLAY_SECONDS": str(case["routinator_timings"]["delta_replay_seconds"]), } return "\n".join( f"export {key}={shlex.quote(value)}" for key, value in ordered.items() ) def main() -> int: parser = argparse.ArgumentParser(description="Resolve one RIR case inside a multi-RIR replay bundle") parser.add_argument("--bundle-root", type=Path, default=None) parser.add_argument("--repo-root", type=Path, default=None) parser.add_argument("--rir", required=True, choices=sorted(RIR_CONFIG)) parser.add_argument("--format", choices=["json", "env"], default="json") args = parser.parse_args() repo_root = (args.repo_root or default_repo_root()).resolve() bundle_root = (args.bundle_root or default_bundle_root(repo_root)).resolve() case = build_case(bundle_root, repo_root, args.rir) if args.format == "env": print(emit_env(case)) else: print(json.dumps(case, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())