#!/usr/bin/env python3 from __future__ import annotations import argparse import csv import gzip import hashlib import html import json import os import re import shlex import shutil import subprocess import sys import time import xml.etree.ElementTree as ET from dataclasses import dataclass from pathlib import Path from typing import Any SCRIPT_DIR = Path(__file__).resolve().parent REPO_ROOT = SCRIPT_DIR.parents[1] DEV_ROOT = REPO_ROOT.parents[1] ROUTINATOR_ROOT = DEV_ROOT / "routinator" RPKI_CLIENT_ROOT = DEV_ROOT / "rpki-client-portable" FIXTURE_ROOT = REPO_ROOT / "tests" / "fixtures" RIR_TAL = { "afrinic": "afrinic.tal", "apnic": "apnic-rfc7730-https.tal", "arin": "arin.tal", "lacnic": "lacnic.tal", "ripe": "ripe-ncc.tal", } RIR_TA = { "afrinic": "afrinic-ta.cer", "apnic": "apnic-ta.cer", "arin": "arin-ta.cer", "lacnic": "lacnic-ta.cer", "ripe": "ripe-ncc-ta.cer", } RIR_CIR_TAL_URI = { "afrinic": "https://rpki.afrinic.net/tal/afrinic.tal", "apnic": "https://rpki.apnic.net/tal/apnic-rfc7730-https.tal", "arin": "https://www.arin.net/resources/manage/rpki/arin.tal", "lacnic": "https://www.lacnic.net/innovaportal/file/4983/1/lacnic.tal", "ripe": "https://tal.rpki.ripe.net/ripe-ncc.tal", } RP_ORDER = ["ours-rp", "routinator", "rpki-client"] RPKI_SCOPE_POLICIES = {"publication-point", "module-root"} @dataclass(frozen=True) class CommandResult: returncode: int stdout: str stderr: str def run_local(argv: list[str], *, cwd: Path | None = None, check: bool = True, capture: bool = True, env: dict[str, str] | None = None) -> CommandResult: result = subprocess.run( argv, cwd=str(cwd) if cwd else None, text=True, capture_output=capture, env=env, check=False, ) if check and result.returncode != 0: raise SystemExit( f"command failed ({result.returncode}): {shlex.join(argv)}\n" f"cwd={cwd}\nstdout:\n{result.stdout}\nstderr:\n{result.stderr}" ) return CommandResult(result.returncode, result.stdout or "", result.stderr or "") def ssh_script(target: str, script: str, *, check: bool = True) -> CommandResult: result = subprocess.run(["ssh", target, "bash", "-s"], input=script, text=True, capture_output=True, check=False) if check and result.returncode != 0: raise SystemExit(f"remote script failed ({result.returncode}) on {target}\nstdout:\n{result.stdout}\nstderr:\n{result.stderr}") return CommandResult(result.returncode, result.stdout or "", result.stderr or "") def ssh_script_stream(target: str, script: str, *, check: bool = True) -> CommandResult: result = subprocess.run(["ssh", target, "bash", "-s"], input=script, text=True, check=False) if check and result.returncode != 0: raise SystemExit(f"remote script failed ({result.returncode}) on {target}") return CommandResult(result.returncode, "", "") def rsync_to_remote(target: str, source: Path, destination: str) -> None: run_local(["rsync", "-a", str(source), f"{target}:{destination}"]) def rsync_dir_to_remote(target: str, source: Path, destination: str) -> None: run_local(["rsync", "-a", f"{source}/", f"{target}:{destination}/"]) def rsync_from_remote(target: str, source: str, destination: Path, files: list[str]) -> None: destination.mkdir(parents=True, exist_ok=True) files_from = destination.parent / f"{destination.name}-files-from.txt" files_from.write_text("\n".join(sorted(set(files))) + "\n", encoding="utf-8") archive_name = f".{destination.name}-remote-files.tar.gz" archive_remote = f"{source.rstrip('/')}/{archive_name}" files_from_remote = f"{source.rstrip('/')}/{files_from.name}" create_archive = f""" set -euo pipefail cd {shlex.quote(source)} tar --ignore-failed-read -czf {shlex.quote(archive_remote)} --files-from {shlex.quote(files_from_remote)} """ subprocess.run(["scp", str(files_from), f"{target}:{files_from_remote}"], check=True) try: ssh_script(target, create_archive) run_local(["scp", f"{target}:{archive_remote}", str(destination / archive_name)]) run_local(["tar", "-xzf", str(destination / archive_name), "-C", str(destination)]) finally: ssh_script(target, f"rm -f {shlex.quote(archive_remote)} {shlex.quote(files_from_remote)}", check=False) def write_json(path: Path, data: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(data, indent=2, ensure_ascii=False, sort_keys=True) + "\n", encoding="utf-8") def load_json(path: Path) -> Any: return json.loads(path.read_text(encoding="utf-8")) def sha256_file(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() def git_output(repo: Path, args: list[str], *, check: bool = True) -> str: return run_local(["git", "-C", str(repo), *args], check=check).stdout.strip() def git_status(repo: Path) -> str: return git_output(repo, ["status", "--short"], check=False) def choose_latest_semver_tag(repo: Path, prefix_v: bool) -> str: tags = git_output(repo, ["tag", "--list"], check=False).splitlines() best: tuple[int, ...] | None = None best_tag = "" pattern = re.compile(r"^v?(\d+)\.(\d+)(?:\.(\d+))?(?:p(\d+))?$") for tag in tags: if prefix_v and not tag.startswith("v"): continue if not prefix_v and tag.startswith("v"): continue match = pattern.match(tag) if not match: continue parts = tuple(int(part) if part is not None else 0 for part in match.groups()) if best is None or parts > best: best = parts best_tag = tag if not best_tag: raise SystemExit(f"cannot resolve latest {'v-prefixed' if prefix_v else 'numeric'} semver tag in {repo}") return best_tag def parse_rirs(raw: str) -> list[str]: rirs = [] for token in raw.split(","): rir = token.strip().lower() if not rir: continue if rir not in RIR_TAL: raise SystemExit(f"invalid RIR: {rir}; allowed: {','.join(RIR_TAL)}") rirs.append(rir) if not rirs: raise SystemExit("at least one RIR is required") return rirs def parse_elapsed_to_ms(raw: str) -> int: raw = raw.strip() if not raw: return 0 days = 0 if "-" in raw: day_raw, raw = raw.split("-", 1) days = int(day_raw) parts = raw.split(":") try: if len(parts) == 3: hours, minutes, seconds = parts elif len(parts) == 2: hours = "0" minutes, seconds = parts else: hours = "0" minutes = "0" seconds = parts[0] total = days * 86400 + int(hours) * 3600 + int(minutes) * 60 + float(seconds) return int(round(total * 1000)) except ValueError: return 0 def parse_time_file(path: Path) -> dict[str, Any]: values: dict[str, Any] = {} if not path.is_file(): return values for line in path.read_text(encoding="utf-8", errors="replace").splitlines(): if "Elapsed (wall clock) time" in line: elapsed = line.split(":", 1)[1] if ")" in line: elapsed = line.rsplit("):", 1)[1] values["wallMs"] = parse_elapsed_to_ms(elapsed) elif "Maximum resident set size" in line: values["maxRssKb"] = _last_int(line) elif "User time (seconds)" in line: values["userSeconds"] = _last_float(line) elif "System time (seconds)" in line: values["systemSeconds"] = _last_float(line) elif "Percent of CPU" in line: values["cpuPercent"] = line.rsplit(":", 1)[1].strip() return values def _last_int(line: str) -> int: try: return int(line.rsplit(":", 1)[1].strip()) except Exception: return 0 def _last_float(line: str) -> float: try: return float(line.rsplit(":", 1)[1].strip()) except Exception: return 0.0 def normalize_asn(value: Any) -> str: text = str(value).strip().strip('"') if text.upper().startswith("AS"): text = text[2:] return f"AS{int(text)}" def normalize_prefix(prefix: Any) -> str: text = str(prefix).strip().strip('"') return text def normalize_vrps_from_csv(path: Path) -> set[str]: rows: set[str] = set() if not path.is_file(): return rows with path.open("r", encoding="utf-8", errors="replace", newline="") as handle: reader = csv.DictReader(handle) for row in reader: asn = row.get("ASN") or row.get("asn") or row.get("AS") prefix = row.get("IP Prefix") or row.get("prefix") or row.get("Prefix") max_len = row.get("Max Length") or row.get("maxLength") or row.get("maxlength") if asn and prefix and max_len: rows.add(f"{normalize_asn(asn)},{normalize_prefix(prefix)},{int(str(max_len).strip())}") return rows def normalize_vaps_from_csv(path: Path) -> set[str]: rows: set[str] = set() if not path.is_file(): return rows with path.open("r", encoding="utf-8", errors="replace", newline="") as handle: reader = csv.DictReader(handle) for row in reader: customer = row.get("Customer ASN") or row.get("customer") or row.get("customer_asid") providers_raw = row.get("Providers") or row.get("providers") or "" if not customer: continue providers = [normalize_asn(item) for item in re.split(r"[;,\s]+", providers_raw.strip()) if item] rows.add(f"{normalize_asn(customer)}|{','.join(sorted(set(providers), key=lambda x: int(x[2:])))}") return rows def normalize_vrps_from_json(path: Path) -> set[str]: rows: set[str] = set() if not path.is_file(): return rows data = load_json(path) for key in ("roas", "routeOrigins", "valid_roas"): items = data.get(key) if isinstance(data, dict) else None if not isinstance(items, list): continue for item in items: if not isinstance(item, dict): continue asn = item.get("asn") or item.get("origin") or item.get("origin_as") prefix = item.get("prefix") max_len = item.get("maxLength") or item.get("max_length") or item.get("maxlen") if asn is not None and prefix is not None and max_len is not None: rows.add(f"{normalize_asn(asn)},{normalize_prefix(prefix)},{int(max_len)}") return rows def normalize_vaps_from_json(path: Path) -> set[str]: rows: set[str] = set() if not path.is_file(): return rows data = load_json(path) for key in ("aspas", "aspaAssertions", "vaps"): items = data.get(key) if isinstance(data, dict) else None if not isinstance(items, list): continue for item in items: if not isinstance(item, dict): continue customer = item.get("customer") or item.get("customer_asid") or item.get("customerAsid") or item.get("customerAsn") providers = item.get("providers") or item.get("providerAsns") or item.get("provider_asns") or [] if customer is None: continue if isinstance(providers, str): provider_items = [p for p in re.split(r"[;,\s]+", providers) if p] else: provider_items = providers provider_asns = [normalize_asn(provider) for provider in provider_items] rows.add(f"{normalize_asn(customer)}|{','.join(sorted(set(provider_asns), key=lambda x: int(x[2:])))}") return rows def write_set(path: Path, values: set[str]) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text("\n".join(sorted(values)) + ("\n" if values else ""), encoding="utf-8") def write_gzip_set(path: Path, values: set[str]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with gzip.open(path, "wt", encoding="utf-8") as handle: handle.write("\n".join(sorted(values)) + ("\n" if values else "")) def read_set(path: Path) -> set[str]: gzip_candidate = Path(f"{path}.gz") candidate = gzip_candidate if gzip_candidate.is_file() else path if not candidate.is_file(): return set() if candidate.suffix == ".gz": with gzip.open(candidate, "rt", encoding="utf-8", errors="replace") as handle: return {line.strip() for line in handle if line.strip()} return {line.strip() for line in candidate.read_text(encoding="utf-8", errors="replace").splitlines() if line.strip()} def set_delta(previous: set[str], current: set[str]) -> dict[str, Any]: added = current - previous removed = previous - current unchanged = current & previous denom = max(len(previous), len(current), 1) return { "added": len(added), "removed": len(removed), "unchanged": len(unchanged), "previous": len(previous), "current": len(current), "changedPercent": round((len(added) + len(removed)) / denom * 100, 6), "unchangedPercent": round(len(unchanged) / denom * 100, 6), } def overlap(left: set[str], right: set[str]) -> dict[str, Any]: intersection = left & right union = left | right return { "intersection": len(intersection), "onlyLeft": len(left - right), "onlyRight": len(right - left), "leftCount": len(left), "rightCount": len(right), "jaccard": round(len(intersection) / max(len(union), 1), 8), } def artifact(path: Path, run_root: Path, type_name: str) -> dict[str, Any]: if not path.exists(): return {"type": type_name, "path": str(path), "exists": False, "size": 0, "sha256": ""} return { "type": type_name, "path": str(path), "relativePath": path.relative_to(run_root).as_posix() if path.is_relative_to(run_root) else str(path), "exists": True, "size": path.stat().st_size, "sha256": sha256_file(path) if path.is_file() else "", } def resolve_versions() -> dict[str, Any]: routinator_tag = choose_latest_semver_tag(ROUTINATOR_ROOT, prefix_v=True) rpki_client_tag = choose_latest_semver_tag(RPKI_CLIENT_ROOT, prefix_v=False) return { "ours-rp": { "sourceDir": str(REPO_ROOT), "commit": git_output(REPO_ROOT, ["rev-parse", "HEAD"], check=False), "status": git_status(REPO_ROOT), "tag": "", }, "routinator": { "sourceDir": str(ROUTINATOR_ROOT), "selectedTag": routinator_tag, "currentRef": git_output(ROUTINATOR_ROOT, ["rev-parse", "--abbrev-ref", "HEAD"], check=False), "currentCommit": git_output(ROUTINATOR_ROOT, ["rev-parse", "HEAD"], check=False), "status": git_status(ROUTINATOR_ROOT), }, "rpki-client": { "sourceDir": str(RPKI_CLIENT_ROOT), "selectedTag": rpki_client_tag, "currentRef": git_output(RPKI_CLIENT_ROOT, ["rev-parse", "--abbrev-ref", "HEAD"], check=False), "currentCommit": git_output(RPKI_CLIENT_ROOT, ["rev-parse", "HEAD"], check=False), "status": git_status(RPKI_CLIENT_ROOT), }, } def prepare_rpki_client_official_tree(tag: str) -> None: """Checkout official portable tag and rebuild generated 9.8 sources.""" if git_status(RPKI_CLIENT_ROOT): raise SystemExit(f"rpki-client worktree dirty; refusing checkout:\n{git_status(RPKI_CLIENT_ROOT)}") run_local(["git", "checkout", tag], cwd=RPKI_CLIENT_ROOT) update_branch = f"rpki-client-{tag}" update_result = run_local(["./update.sh", update_branch], cwd=RPKI_CLIENT_ROOT, check=False) if update_result.returncode != 0: print( f"warning: ./update.sh {update_branch} failed; falling back to local openbsd checkout", file=sys.stderr, ) print(update_result.stdout, file=sys.stderr) print(update_result.stderr, file=sys.stderr) restore_rpki_client_sources_from_local_openbsd(update_branch) configure_env = rpki_client_configure_env() prefix = DEV_ROOT / ".cache" / "rpki-client-9.8-cir" run_local( [ "./configure", f"--prefix={prefix}", "--with-rsync=rsync", "--with-user=_rpki-client", ], cwd=RPKI_CLIENT_ROOT, env=configure_env, ) run_local(["make", "clean"], cwd=RPKI_CLIENT_ROOT, check=False, capture=False) run_local(["make", f"-j{max(2, min(os.cpu_count() or 2, 8))}"], cwd=RPKI_CLIENT_ROOT, env=configure_env, capture=False) binary = RPKI_CLIENT_ROOT / "src" / "rpki-client" version = run_local([str(binary), "-V"]).stdout.strip() if version != f"rpki-client-portable {tag}": raise SystemExit(f"unexpected rpki-client version after build: {version}") source_text = (RPKI_CLIENT_ROOT / "src" / "http.c").read_text(encoding="utf-8", errors="replace") if 'unveil("/", "r")' not in source_text: raise SystemExit("official rpki-client 9.8 source missing landlock unveil(\"/\", \"r\") patch") binary_strings = run_local(["strings", str(binary)]).stdout if "--ta-fixture" in binary_strings: raise SystemExit("official rpki-client binary still contains local ta-fixture patch") if "_rpki-client" not in binary_strings: raise SystemExit("rpki-client binary was not configured with --with-user=_rpki-client") def restore_rpki_client_sources_from_local_openbsd(openbsd_ref: str) -> None: openbsd_repo = RPKI_CLIENT_ROOT / "openbsd" if not (openbsd_repo / ".git").is_dir(): raise SystemExit("rpki-client local openbsd checkout missing; cannot restore official source") checkout_result = run_local(["git", "checkout", openbsd_ref], cwd=openbsd_repo, check=False) if checkout_result.returncode != 0: raise SystemExit( f"cannot checkout local openbsd ref {openbsd_ref}\n" f"stdout:\n{checkout_result.stdout}\nstderr:\n{checkout_result.stderr}" ) generated_main = RPKI_CLIENT_ROOT / "src" / "main.c" generated_output = RPKI_CLIENT_ROOT / "src" / "output.c" version_h = RPKI_CLIENT_ROOT / "src" / "version.h" if not generated_main.is_file() or not generated_output.is_file() or not version_h.is_file(): raise SystemExit("rpki-client generated source tree missing; automake/autoconf unavailable to regenerate it") openbsd_source = RPKI_CLIENT_ROOT / "openbsd" / "src" / "usr.sbin" / "rpki-client" if not (openbsd_source / "main.c").is_file(): raise SystemExit("rpki-client official OpenBSD source mirror missing; cannot restore official source") source_names = [ "as.c", "aspa.c", "ccr.c", "cert.c", "cms.c", "constraints.c", "crl.c", "encoding.c", "extern.h", "filemode.c", "http.c", "io.c", "ip.c", "json.c", "json.h", "main.c", "mft.c", "mkdir.c", "ometric.c", "ometric.h", "output-bgpd.c", "output-bird.c", "output-csv.c", "output-json.c", "output-ometric.c", "output.c", "parser.c", "print.c", "repo.c", "rfc3779.c", "roa.c", "rpki-asn1.h", "rrdp.c", "rrdp.h", "rrdp_delta.c", "rrdp_notification.c", "rrdp_snapshot.c", "rrdp_util.c", "rsc.c", "rsync.c", "spl.c", "tak.c", "tal.c", "validate.c", "version.h", "x509.c", ] for name in source_names: shutil.copy2(openbsd_source / name, RPKI_CLIENT_ROOT / "src" / name) os.utime(RPKI_CLIENT_ROOT / "src" / name, None) man_source = openbsd_source / "rpki-client.8" if man_source.is_file(): shutil.copy2(man_source, RPKI_CLIENT_ROOT / "src" / "rpki-client.8") os.utime(RPKI_CLIENT_ROOT / "src" / "rpki-client.8", None) version_text = (openbsd_source / "version.h").read_text(encoding="utf-8", errors="replace") match = re.search(r'RPKI_VERSION\s+"([^"]+)"', version_text) if match: (RPKI_CLIENT_ROOT / "VERSION").write_text(match.group(1) + "\n", encoding="utf-8") for patch in sorted((RPKI_CLIENT_ROOT / "patches").glob("*.patch")): run_local(["patch", "-s", "-p3", "-i", str(patch)], cwd=RPKI_CLIENT_ROOT / "src") man_page = RPKI_CLIENT_ROOT / "src" / "rpki-client.8" if man_page.is_file(): man_page.replace(RPKI_CLIENT_ROOT / "src" / "rpki-client.8.in") generated_text = "\n".join( (RPKI_CLIENT_ROOT / "src" / name).read_text(encoding="utf-8", errors="replace") for name in ("main.c", "output.c", "extern.h") ) if "FORMAT_CIR" in generated_text or "ta_fixture" in generated_text: raise SystemExit("failed to restore official rpki-client generated source") src_dir = RPKI_CLIENT_ROOT / "src" for path in [src_dir / "rpki-client", *src_dir.glob("rpki_client-*.o")]: if path.exists(): path.unlink() deps_dir = src_dir / ".deps" if deps_dir.is_dir(): for path in deps_dir.glob("rpki_client-*.Po"): path.unlink() for rel in ("src/Makefile", "src/Makefile.in"): makefile = RPKI_CLIENT_ROOT / rel if makefile.exists(): sanitize_rpki_client_makefile(makefile) def rpki_client_configure_env() -> dict[str, str]: env = os.environ.copy() include_flags = ["-D_DEFAULT_SOURCE", "-D_BSD_SOURCE", "-D_GNU_SOURCE"] cpp_flags = ["-DOPENSSL_SUPPRESS_DEPRECATED"] ld_flags: list[str] = [] sysroot = DEV_ROOT / ".cache" / "rpki-client-9.7-build" / "sysroot" / "usr" libtls_root = DEV_ROOT / ".cache" / "libtls-dev-3.8.1" if (sysroot / "include").is_dir(): include_flags.insert(0, f"-I{sysroot / 'include'}") if (sysroot / "lib" / "x86_64-linux-gnu").is_dir(): ld_flags.append(f"-L{sysroot / 'lib' / 'x86_64-linux-gnu'}") if (libtls_root / "include").is_dir(): include_flags.append(f"-I{libtls_root / 'include'}") cpp_flags.insert(0, f"-I{libtls_root / 'include'}") if (libtls_root / "lib").is_dir(): ld_flags.append(f"-L{libtls_root / 'lib'}") prefix = DEV_ROOT / ".cache" / "rpki-client-9.8-cir" ld_flags.append(f"-Wl,-rpath,{prefix}") env["CFLAGS"] = " ".join(["-g", "-O2", *include_flags]) env["CPPFLAGS"] = " ".join(cpp_flags) env["LDFLAGS"] = " ".join(ld_flags) return env def sanitize_rpki_client_makefile(path: Path) -> None: """Remove local CIR source residue from generated automake files.""" text = path.read_text(encoding="utf-8", errors="replace") lines = text.splitlines() sanitized: list[str] = [] skip_rule = False for line in lines: if line.startswith("rpki_client-cir.o:") or line.startswith("rpki_client-cir.obj:"): skip_rule = True continue if skip_rule: if line.startswith("rpki_client-") and not line.startswith(("rpki_client-cir.o:", "rpki_client-cir.obj:")): skip_rule = False else: continue current = line.replace(" rpki_client-cir.$(OBJEXT)", "") current = current.replace(" rpki_client-cir.Po", "") current = current.replace(" ./$(DEPDIR)/rpki_client-cir.Po", "") current = current.replace(" cert.c cir.c cms.c", " cert.c cms.c") current = current.replace("rpki_client-cir.Po ", "") current = current.replace("@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/rpki_client-cir.Po@am__quote@ # am--include-marker", "") current = current.replace("include ./$(DEPDIR)/rpki_client-cir.Po # am--include-marker", "") current = current.replace("\t-rm -f ./$(DEPDIR)/rpki_client-cir.Po", "") if current: sanitized.append(current) path.write_text("\n".join(sanitized) + "\n", encoding="utf-8") def build_release_binaries(versions: dict[str, Any], skip_build: bool, active_rp_names: list[str]) -> dict[str, Any]: if not skip_build: run_local(["cargo", "build", "--release", "--bin", "rpki"], cwd=REPO_ROOT) if "routinator" in active_rp_names and versions["routinator"].get("status"): raise SystemExit(f"routinator worktree dirty; refusing checkout:\n{versions['routinator']['status']}") if "routinator" in active_rp_names: run_local(["git", "checkout", versions["routinator"]["selectedTag"]], cwd=ROUTINATOR_ROOT) run_local(["cargo", "build", "--release"], cwd=ROUTINATOR_ROOT) if "rpki-client" in active_rp_names: prepare_rpki_client_official_tree(versions["rpki-client"]["selectedTag"]) binaries = { "ours-rp": REPO_ROOT / "target" / "release" / "rpki", "routinator": ROUTINATOR_ROOT / "target" / "release" / "routinator", "rpki-client": RPKI_CLIENT_ROOT / "src" / "rpki-client", } metadata: dict[str, Any] = {} for name, path in binaries.items(): if name not in active_rp_names: continue if not path.is_file(): raise SystemExit(f"missing binary for {name}: {path}") metadata[name] = { **versions[name], "binaryPath": str(path), "binarySha256": sha256_file(path), "binarySize": path.stat().st_size, "builtAtUtc": utc_iso(), } if name != "ours-rp": source = Path(metadata[name]["sourceDir"]) metadata[name]["commit"] = git_output(source, ["rev-parse", "HEAD"], check=False) metadata[name]["statusAfterBuild"] = git_status(source) return metadata def utc_iso() -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) def build_fixture_tree(local_root: Path, rirs: list[str]) -> None: for sub in ("tal", "ta"): (local_root / sub).mkdir(parents=True, exist_ok=True) for rir in rirs: shutil.copy2(FIXTURE_ROOT / "tal" / RIR_TAL[rir], local_root / "tal" / RIR_TAL[rir]) shutil.copy2(FIXTURE_ROOT / "ta" / RIR_TA[rir], local_root / "ta" / RIR_TA[rir]) def active_rps(args: argparse.Namespace) -> list[str]: rps = [item.strip() for item in args.rps.split(",") if item.strip()] invalid = [rp for rp in rps if rp not in RP_ORDER] if invalid: raise SystemExit(f"invalid --rps entries: {','.join(invalid)}; allowed: {','.join(RP_ORDER)}") if not rps: raise SystemExit("--rps must include at least one RP") return rps def prepare_remote(args: argparse.Namespace, binary_meta: dict[str, Any], rirs: list[str], local_fixture_root: Path) -> None: remote = args.remote_root local_remote_root = args.run_root / "remote" if local_remote_root.exists(): shutil.rmtree(local_remote_root) (local_remote_root / "bin").mkdir(parents=True, exist_ok=True) (local_remote_root / "fixtures").mkdir(parents=True, exist_ok=True) (local_remote_root / "env").mkdir(parents=True, exist_ok=True) shutil.copytree(local_fixture_root, local_remote_root / "fixtures", dirs_exist_ok=True) write_json(local_remote_root / "sources-version-metadata.json", binary_meta) for rp, remote_name in (("ours-rp", "rpki"), ("routinator", "routinator"), ("rpki-client", "rpki-client")): if rp not in args.active_rps: continue shutil.copy2(Path(binary_meta[rp]["binaryPath"]), local_remote_root / "bin" / remote_name) ssh_script(args.ssh_target, f""" set -euo pipefail systemctl disable --now rpki-client.timer >/dev/null 2>&1 || true systemctl stop rpki-client.service >/dev/null 2>&1 || true pkill -x rpki-client >/dev/null 2>&1 || true pkill -x routinator >/dev/null 2>&1 || true pkill -x rpki >/dev/null 2>&1 || true id -u _rpki-client >/dev/null 2>&1 || useradd -r -M -s /usr/sbin/nologin _rpki-client || true rm -rf {shlex.quote(remote)} mkdir -p {shlex.quote(remote)}/bin {shlex.quote(remote)}/fixtures {shlex.quote(remote)}/env {shlex.quote(remote)}/reports {shlex.quote(remote)}/rp-runs (df -h; echo; free -h; echo; uptime; echo; uname -a; echo; nproc; echo; lscpu | head -n 40) > {shlex.quote(remote)}/env/before.txt 2>&1 || true """) write_json(args.run_root / "version-metadata.json", binary_meta) rsync_dir_to_remote(args.ssh_target, local_remote_root, remote) def fetch_only_binary_metadata(args: argparse.Namespace) -> dict[str, Any]: local_remote_root = args.run_root / "remote" if local_remote_root.exists(): shutil.rmtree(local_remote_root) local_remote_root.mkdir(parents=True, exist_ok=True) rsync_from_remote(args.ssh_target, args.remote_root, local_remote_root, ["sources-version-metadata.json"]) metadata = local_remote_root / "sources-version-metadata.json" if not metadata.is_file(): raise SystemExit(f"missing remote sources-version-metadata.json under {args.remote_root}") return load_json(metadata) def remote_shell_for_rp(rp: str, run_index: int, mode: str, rirs: list[str], remote_root: str, args: argparse.Namespace) -> str: run_dir = f"{remote_root}/rp-runs/{rp}/runs/run_{run_index:04d}" state_dir = f"{remote_root}/rp-runs/{rp}/state" fixture_dir = f"{remote_root}/fixtures" if rp == "ours-rp": argv = [ f"{remote_root}/bin/rpki", "--db", f"{state_dir}/work-db", "--repo-bytes-db", f"{state_dir}/repo-bytes.db", "--rsync-mirror-root", f"{state_dir}/rsync-mirror", "--rsync-scope", args.ours_rsync_scope, "--report-json", f"{run_dir}/report.json", "--report-json-compact", "--ccr-out", f"{run_dir}/result.ccr", "--cir-enable", "--cir-out", f"{run_dir}/input.cir", "--vrps-csv-out", f"{run_dir}/vrps.csv", "--vaps-csv-out", f"{run_dir}/vaps.csv", "--compare-view-trust-anchor", "all5" if len(rirs) > 1 else rirs[0], ] for rir in rirs: argv.extend(["--tal-path", f"{fixture_dir}/tal/{RIR_TAL[rir]}", "--ta-path", f"{fixture_dir}/ta/{RIR_TA[rir]}"]) for rir in rirs: argv.extend(["--cir-tal-uri", RIR_CIR_TAL_URI[rir]]) reset = f"rm -rf {shlex.quote(state_dir)} && mkdir -p {shlex.quote(state_dir)}/work-db {shlex.quote(state_dir)}/rsync-mirror" if mode == "snapshot" else f"mkdir -p {shlex.quote(state_dir)}/work-db {shlex.quote(state_dir)}/rsync-mirror" return timed_script(run_dir, reset, argv) if rp == "routinator": extra_tals = f"{state_dir}/extra-tals" argv = [ f"{remote_root}/bin/routinator", "-r", f"{state_dir}/repository", "--no-rir-tals", "--extra-tals-dir", extra_tals, ] if args.routinator_enable_aspa: argv.append("--enable-aspa") if mode == "snapshot": argv.append("--fresh") argv.extend(["vrps", "-f", "jsonext", "-o", f"{run_dir}/routinator.json"]) tal_copy = " && ".join( f"cp {shlex.quote(fixture_dir + '/tal/' + RIR_TAL[rir])} {shlex.quote(extra_tals + '/' + RIR_TAL[rir])}" for rir in rirs ) reset = ( f"rm -rf {shlex.quote(state_dir)} && mkdir -p {shlex.quote(state_dir)}/repository {shlex.quote(extra_tals)} && {tal_copy}" if mode == "snapshot" else f"mkdir -p {shlex.quote(state_dir)}/repository {shlex.quote(extra_tals)} && {tal_copy}" ) return timed_script(run_dir, reset, argv) if rp == "rpki-client": skiplist = f"{state_dir}/skiplist" argv = [f"{remote_root}/bin/rpki-client", "-j", "-c", "-S", skiplist, "-d", f"{state_dir}/cache"] for rir in rirs: argv.extend(["-t", f"{fixture_dir}/tal/{RIR_TAL[rir]}"]) argv.append(run_dir) ta_prime = " && ".join( [ ( f"mkdir -p {shlex.quote(state_dir + '/cache/.ta/' + RIR_TAL[rir].removesuffix('.tal'))} " f"&& cp {shlex.quote(fixture_dir + '/ta/' + RIR_TA[rir])} " f"{shlex.quote(state_dir + '/cache/.ta/' + RIR_TAL[rir].removesuffix('.tal') + '/' + tal_certificate_filename(rir))}" ) for rir in rirs ] ) reset = ( f"rm -rf {shlex.quote(state_dir)} && mkdir -p {shlex.quote(state_dir)}/cache " f"&& {ta_prime} " f"&& touch {shlex.quote(skiplist)} && chown -R _rpki-client:_rpki-client {shlex.quote(state_dir)} {shlex.quote(run_dir)} " f"&& chmod -R u+rwX,go+rX {shlex.quote(state_dir)} {shlex.quote(run_dir)}" if mode == "snapshot" else f"mkdir -p {shlex.quote(state_dir)}/cache && touch {shlex.quote(skiplist)} " f"&& chown _rpki-client:_rpki-client {shlex.quote(state_dir)} {shlex.quote(state_dir + '/cache')} {shlex.quote(skiplist)} {shlex.quote(run_dir)} " f"&& chmod u+rwX,go+rX {shlex.quote(state_dir)} {shlex.quote(state_dir + '/cache')} {shlex.quote(skiplist)} {shlex.quote(run_dir)}" ) return timed_script(run_dir, reset, argv) raise ValueError(rp) def tal_certificate_filename(rir: str) -> str: tal_path = FIXTURE_ROOT / "tal" / RIR_TAL[rir] for line in tal_path.read_text(encoding="utf-8", errors="replace").splitlines(): stripped = line.strip() if stripped.startswith(("rsync://", "https://")) and stripped.endswith(".cer"): return stripped.rsplit("/", 1)[1] raise SystemExit(f"cannot resolve TA certificate filename from {tal_path}") def timed_script(run_dir: str, prefix: str, argv: list[str]) -> str: argv_q = shlex.join(argv) return f""" mkdir -p {shlex.quote(run_dir)} {prefix} date -u +%Y-%m-%dT%H:%M:%SZ > {shlex.quote(run_dir)}/start-utc.txt set +e set +m /usr/bin/time -v -o {shlex.quote(run_dir)}/process-time.txt -- {argv_q} > {shlex.quote(run_dir)}/stdout.log 2> {shlex.quote(run_dir)}/stderr.log code=$? set -e find {shlex.quote(run_dir)} -maxdepth 1 -type f -name '.*' -delete 2>/dev/null || true date -u +%Y-%m-%dT%H:%M:%SZ > {shlex.quote(run_dir)}/end-utc.txt printf '%s\n' "$code" > {shlex.quote(run_dir)}/exit-code.txt exit "$code" """ def run_remote_matrix(args: argparse.Namespace, rirs: list[str]) -> None: if args.schedule == "rp-block": steps = ((rp, run_index) for rp in args.active_rps for run_index in range(1, args.runs + 1)) else: steps = ((rp, run_index) for run_index in range(1, args.runs + 1) for rp in args.active_rps) for rp, run_index in steps: mode = "snapshot" if run_index == 1 else "delta" print(f"remote_run_start rp={rp} run={run_index} mode={mode} at={utc_iso()}", flush=True) script = "set -euo pipefail\n" + remote_shell_for_rp(rp, run_index, mode, rirs, args.remote_root, args) result = ssh_script(args.ssh_target, script, check=False) if result.returncode != 0: print(f"remote run failed: {rp} run {run_index} exit={result.returncode}", file=sys.stderr) print(result.stdout, file=sys.stderr) print(result.stderr, file=sys.stderr) if not args.continue_on_failure: return print(f"remote_run_done rp={rp} run={run_index} exit={result.returncode} at={utc_iso()}", flush=True) ssh_script(args.ssh_target, f"(df -h; echo; free -h; echo; uptime) > {shlex.quote(args.remote_root)}/env/after.txt 2>&1 || true", check=False) def fetch_remote_outputs(args: argparse.Namespace) -> None: collect_remote_metadata(args) destination = args.run_root / "remote" if destination.exists(): shutil.rmtree(destination) rsync_from_remote(args.ssh_target, args.remote_root, destination, expected_remote_files(args.runs, args.active_rps)) def collect_remote_metadata(args: argparse.Namespace) -> None: remote_script = f""" set -euo pipefail ROOT={shlex.quote(args.remote_root)} for rp in {' '.join(args.active_rps)}; do for run_dir in "$ROOT"/rp-runs/$rp/runs/run_*; do [ -d "$run_dir" ] || continue case "$rp" in ours-rp) if [ -f "$run_dir/vrps.csv" ] || [ -f "$run_dir/vaps.csv" ]; then python3 - "$run_dir/vrps.csv" "$run_dir/vaps.csv" "$run_dir/normalized-products.json" "$run_dir/normalized-vrps.txt.gz" "$run_dir/normalized-vaps.txt.gz" "$run_dir/ours-rp-counts.json" <<'PY' import csv, gzip, json, re, sys vrps_csv, vaps_csv, meta, vrps_out, vaps_out, counts_out = sys.argv[1:] def asn(v): s = str(v).strip().strip('"') if s.upper().startswith('AS'): s = s[2:] return 'AS' + str(int(s)) vrps = set() try: with open(vrps_csv, encoding='utf-8', errors='replace', newline='') as f: for row in csv.DictReader(f): a = row.get('ASN') or row.get('asn') or row.get('AS') p = row.get('IP Prefix') or row.get('prefix') or row.get('Prefix') m = row.get('Max Length') or row.get('maxLength') or row.get('maxlength') if a and p and m: vrps.add(f"{{asn(a)}},{{str(p).strip().strip(chr(34))}},{{int(str(m).strip())}}") except FileNotFoundError: pass vaps = set() try: with open(vaps_csv, encoding='utf-8', errors='replace', newline='') as f: for row in csv.DictReader(f): c = row.get('Customer ASN') or row.get('customer') or row.get('customer_asid') ps = row.get('Providers') or row.get('providers') or '' if c: providers = sorted({{asn(p) for p in re.split(r'[;,\\s]+', ps.strip()) if p}}, key=lambda x: int(x[2:])) vaps.add(f"{{asn(c)}}|{{','.join(providers)}}") except FileNotFoundError: pass with gzip.open(vrps_out, 'wt', encoding='utf-8') as f: for row in sorted(vrps): f.write(row + '\\n') with gzip.open(vaps_out, 'wt', encoding='utf-8') as f: for row in sorted(vaps): f.write(row + '\\n') json.dump({{'counts': {{'vrps': len(vrps), 'vaps': len(vaps)}}}}, open(meta, 'w', encoding='utf-8'), indent=2, sort_keys=True) json.dump({{'counts': {{'vrps': len(vrps), 'vaps': len(vaps)}}}}, open(counts_out, 'w', encoding='utf-8'), indent=2, sort_keys=True) PY fi ;; routinator) json="$run_dir/routinator.json" if [ -f "$json" ]; then python3 - "$json" "$run_dir/normalized-products.json" "$run_dir/normalized-vrps.txt.gz" "$run_dir/normalized-vaps.txt.gz" "$run_dir/routinator-counts.json" <<'PY' import gzip, json, re, sys src, meta, vrps_out, vaps_out, counts_out = sys.argv[1:] data = json.load(open(src, encoding='utf-8', errors='replace')) def asn(v): s = str(v).strip().strip('"') if s.upper().startswith('AS'): s = s[2:] return 'AS' + str(int(s)) def vrps_from_json(d): rows = set() for key in ('roas', 'routeOrigins', 'valid_roas'): for item in d.get(key, []) if isinstance(d, dict) else []: if not isinstance(item, dict): continue a = item.get('asn') or item.get('origin') or item.get('origin_as') p = item.get('prefix') m = item.get('maxLength') or item.get('max_length') or item.get('maxlen') if a is not None and p is not None and m is not None: rows.add(f"{{asn(a)}},{{str(p).strip().strip(chr(34))}},{{int(m)}}") return rows def vaps_from_json(d): rows = set() for key in ('aspas', 'aspaAssertions', 'vaps'): for item in d.get(key, []) if isinstance(d, dict) else []: if not isinstance(item, dict): continue c = item.get('customer') or item.get('customer_asid') or item.get('customerAsid') or item.get('customerAsn') ps = item.get('providers') or item.get('providerAsns') or item.get('provider_asns') or [] if c is None: continue if isinstance(ps, str): pitems = [p for p in re.split(r'[;,\\s]+', ps.strip()) if p] else: pitems = ps providers = sorted({{asn(p) for p in pitems}}, key=lambda x: int(x[2:])) rows.add(f"{{asn(c)}}|{{','.join(providers)}}") return rows vrps = vrps_from_json(data) vaps = vaps_from_json(data) with gzip.open(vrps_out, 'wt', encoding='utf-8') as f: for row in sorted(vrps): f.write(row + '\\n') with gzip.open(vaps_out, 'wt', encoding='utf-8') as f: for row in sorted(vaps): f.write(row + '\\n') json.dump({{'counts': {{'vrps': len(vrps), 'vaps': len(vaps)}}}}, open(meta, 'w', encoding='utf-8'), indent=2, sort_keys=True) json.dump({{'counts': {{'vrps': len(vrps), 'vaps': len(vaps)}}}}, open(counts_out, 'w', encoding='utf-8'), indent=2, sort_keys=True) PY fi ;; rpki-client) if [ -f "$run_dir/csv" ] || [ -f "$run_dir/json" ]; then python3 - "$run_dir/csv" "$run_dir/json" "$run_dir/normalized-products.json" "$run_dir/normalized-vrps.txt.gz" "$run_dir/normalized-vaps.txt.gz" "$run_dir/rpki-client-counts.json" <<'PY' import csv, gzip, json, re, sys csv_path, json_path, meta, vrps_out, vaps_out, counts_out = sys.argv[1:] def asn(v): s = str(v).strip().strip('"') if s.upper().startswith('AS'): s = s[2:] return 'AS' + str(int(s)) vrps = set() try: with open(csv_path, encoding='utf-8', errors='replace', newline='') as f: for row in csv.DictReader(f): a = row.get('ASN') or row.get('asn') or row.get('AS') p = row.get('IP Prefix') or row.get('prefix') or row.get('Prefix') m = row.get('Max Length') or row.get('maxLength') or row.get('maxlength') if a and p and m: vrps.add(f"{{asn(a)}},{{str(p).strip().strip(chr(34))}},{{int(str(m).strip())}}") except FileNotFoundError: pass vaps = set() try: data = json.load(open(json_path, encoding='utf-8', errors='replace')) except FileNotFoundError: data = {{}} for key in ('aspas', 'aspaAssertions', 'vaps'): for item in data.get(key, []) if isinstance(data, dict) else []: if not isinstance(item, dict): continue c = item.get('customer') or item.get('customer_asid') or item.get('customerAsid') or item.get('customerAsn') ps = item.get('providers') or item.get('providerAsns') or item.get('provider_asns') or [] if c is None: continue if isinstance(ps, str): pitems = [p for p in re.split(r'[;,\\s]+', ps.strip()) if p] else: pitems = ps providers = sorted({{asn(p) for p in pitems}}, key=lambda x: int(x[2:])) vaps.add(f"{{asn(c)}}|{{','.join(providers)}}") with gzip.open(vrps_out, 'wt', encoding='utf-8') as f: for row in sorted(vrps): f.write(row + '\\n') with gzip.open(vaps_out, 'wt', encoding='utf-8') as f: for row in sorted(vaps): f.write(row + '\\n') json.dump({{'counts': {{'vrps': len(vrps), 'vaps': len(vaps)}}}}, open(meta, 'w', encoding='utf-8'), indent=2, sort_keys=True) json.dump({{'counts': {{'vrps': len(vrps), 'vaps': len(vaps)}}}}, open(counts_out, 'w', encoding='utf-8'), indent=2, sort_keys=True) PY fi ;; esac done done """ ssh_script_stream(args.ssh_target, remote_script, check=False) def expected_remote_files(runs: int, active_rp_names: list[str]) -> list[str]: files = [ "env/before.txt", "env/after.txt", "sources-version-metadata.json", ] for rp in active_rp_names: for run_index in range(1, runs + 1): base = f"rp-runs/{rp}/runs/run_{run_index:04d}" files.extend([ f"{base}/start-utc.txt", f"{base}/end-utc.txt", f"{base}/exit-code.txt", f"{base}/process-time.txt", f"{base}/stdout.log", f"{base}/stderr.log", ]) if rp == "ours-rp": files.extend([ f"{base}/normalized-products.json", f"{base}/normalized-vrps.txt.gz", f"{base}/normalized-vaps.txt.gz", f"{base}/ours-rp-counts.json", f"{base}/stage-timing.json", ]) elif rp == "routinator": files.extend([ f"{base}/normalized-products.json", f"{base}/normalized-vrps.txt.gz", f"{base}/normalized-vaps.txt.gz", f"{base}/routinator-counts.json", ]) else: files.extend([ f"{base}/normalized-products.json", f"{base}/normalized-vrps.txt.gz", f"{base}/normalized-vaps.txt.gz", f"{base}/rpki-client-counts.json", ]) return files def process_outputs(args: argparse.Namespace, binary_meta: dict[str, Any], rirs: list[str]) -> dict[str, Any]: local_remote_root = args.run_root / "remote" rp_results: dict[str, Any] = {} for rp in args.active_rps: runs = [] for run_index in range(1, args.runs + 1): run_dir = local_remote_root / "rp-runs" / rp / "runs" / f"run_{run_index:04d}" mode = "snapshot" if run_index == 1 else "delta" run_meta = collect_run(rp, run_index, mode, run_dir, local_remote_root) runs.append(run_meta) rp_results[rp] = {"version": binary_meta[rp], "runs": runs} add_within_changes(rp_results) cross = build_cross_comparison(rp_results, args.runs, local_remote_root) report = { "generatedAtUtc": utc_iso(), "rirs": rirs, "runsPerRp": args.runs, "schedule": args.schedule, "oursRsyncScope": args.ours_rsync_scope, "routinatorEnableAspa": args.routinator_enable_aspa, "remoteRoot": args.remote_root, "sshTarget": args.ssh_target, "rpResults": rp_results, "crossRpComparison": cross, } write_json(args.run_root / "three-rp-performance-comparison.json", report) write_xml(args.run_root / "three-rp-performance-comparison.xml", report) write_html_report(args.run_root / "three-rp-performance-comparison.html", report) return report def collect_run(rp: str, run_index: int, mode: str, run_dir: Path, root: Path) -> dict[str, Any]: exit_code = int((run_dir / "exit-code.txt").read_text().strip()) if (run_dir / "exit-code.txt").is_file() else -1 time_info = parse_time_file(run_dir / "process-time.txt") start = (run_dir / "start-utc.txt").read_text().strip() if (run_dir / "start-utc.txt").is_file() else "" end = (run_dir / "end-utc.txt").read_text().strip() if (run_dir / "end-utc.txt").is_file() else "" if rp == "ours-rp": vrps = read_set(run_dir / "normalized-vrps.txt") or normalize_vrps_from_csv(run_dir / "vrps.csv") vaps = read_set(run_dir / "normalized-vaps.txt") or normalize_vaps_from_csv(run_dir / "vaps.csv") elif rp == "routinator": vrps = read_set(run_dir / "normalized-vrps.txt") or normalize_vrps_from_json(run_dir / "routinator.json") vaps = read_set(run_dir / "normalized-vaps.txt") or normalize_vaps_from_json(run_dir / "routinator.json") if not vrps and not vaps: counts = remote_counts(run_dir / "routinator-counts.json") vrps = placeholder_set("vrp", counts.get("vrps", 0)) vaps = placeholder_set("vap", counts.get("vaps", 0)) else: vrps = read_set(run_dir / "normalized-vrps.txt") or normalize_vrps_from_csv(run_dir / "csv") or normalize_vrps_from_json(run_dir / "json") vaps = read_set(run_dir / "normalized-vaps.txt") or normalize_vaps_from_json(run_dir / "json") if not vrps and not vaps: counts = remote_counts(run_dir / "rpki-client-counts.json") vrps = placeholder_set("vrp", counts.get("vrps", 0)) vaps = placeholder_set("vap", counts.get("vaps", 0)) write_gzip_set(run_dir / "normalized-vrps.txt.gz", vrps) write_gzip_set(run_dir / "normalized-vaps.txt.gz", vaps) products = {"vrps": len(vrps), "vaps": len(vaps)} normalized = {"vrps": sorted(vrps), "vaps": sorted(vaps), "counts": products} write_json(run_dir / "normalized-products.json", normalized) artifacts = [ artifact(run_dir / "process-time.txt", root, "process-time"), artifact(run_dir / "stdout.log", root, "stdout"), artifact(run_dir / "stderr.log", root, "stderr"), artifact(run_dir / "normalized-vrps.txt.gz", root, "normalized-vrps-gzip"), artifact(run_dir / "normalized-vaps.txt.gz", root, "normalized-vaps-gzip"), ] for candidate, type_name in [ ("report.json", "ours-report"), ("routinator.json", "routinator-json"), ("json", "rpki-client-json"), ("csv", "rpki-client-csv"), ("result.ccr", "ccr"), ("input.cir", "cir"), ("vrps.csv", "ours-vrps-csv"), ("vaps.csv", "ours-vaps-csv"), ]: path = run_dir / candidate if path.exists(): artifacts.append(artifact(path, root, type_name)) run_meta = { "rp": rp, "runIndex": run_index, "syncMode": mode, "startUtc": start, "endUtc": end, "exitCode": exit_code, "wallMs": time_info.get("wallMs", 0), "maxRssKb": time_info.get("maxRssKb", 0), "userSeconds": time_info.get("userSeconds", 0.0), "systemSeconds": time_info.get("systemSeconds", 0.0), "cpuPercent": time_info.get("cpuPercent", ""), "productCounts": products, "publicationStats": collect_publication_stats(rp, run_dir), "runDir": str(run_dir), "artifacts": artifacts, } write_json(run_dir / "run-meta.json", run_meta) write_json(run_dir / "artifacts.json", artifacts) return run_meta def collect_publication_stats(rp: str, run_dir: Path) -> dict[str, Any]: if rp == "ours-rp": stage_path = run_dir / "stage-timing.json" if not stage_path.is_file(): return {"available": False, "source": "stage-timing.json"} stage = load_json(stage_path) return { "available": True, "source": "stage-timing.json", "publicationPoints": int(stage.get("publication_points", 0) or 0), "repoSyncMsTotal": int(stage.get("repo_sync_ms_total", 0) or 0), "publicationPointRepoSyncMsTotal": int(stage.get("publication_point_repo_sync_ms_total", 0) or 0), "rrdpDownloadMsTotal": int(stage.get("rrdp_download_ms_total", 0) or 0), "rsyncDownloadMsTotal": int(stage.get("rsync_download_ms_total", 0) or 0), "downloadEventCount": int(stage.get("download_event_count", 0) or 0), "downloadBytesTotal": int(stage.get("download_bytes_total", 0) or 0), } if rp == "rpki-client": stdout_path = run_dir / "stdout.log" if not stdout_path.is_file(): return {"available": False, "source": "stdout.log"} text = stdout_path.read_text(encoding="utf-8", errors="replace") manifests = re.search(r"Manifests:\s+(\d+)\s+\((\d+)\s+failed parse,\s+(\d+)\s+seqnum gaps\)", text) repos = re.search(r"Repositories:\s+(\d+)", text) mfts_hash = re.search(r"CCR manifest state hash:\s+([A-Za-z0-9+/=]+)", text) return { "available": bool(manifests or repos or mfts_hash), "source": "stdout.log", "manifests": int(manifests.group(1)) if manifests else 0, "manifestFailedParse": int(manifests.group(2)) if manifests else 0, "manifestSeqnumGaps": int(manifests.group(3)) if manifests else 0, "repositories": int(repos.group(1)) if repos else 0, "ccrManifestStateHash": mfts_hash.group(1) if mfts_hash else "", } return { "available": False, "source": "jsonext/stdout", "reason": "Routinator jsonext output used by this benchmark does not expose full publication point or manifest-state set.", } def remote_counts(path: Path) -> dict[str, int]: if not path.is_file(): return {} data = load_json(path) counts = data.get("counts", {}) if isinstance(data, dict) else {} return { "vrps": int(counts.get("vrps", 0) or 0), "vaps": int(counts.get("vaps", 0) or 0), } def placeholder_set(prefix: str, count: int) -> set[str]: return {f"{prefix}:{idx}" for idx in range(count)} def add_within_changes(rp_results: dict[str, Any]) -> None: for rp, data in rp_results.items(): previous_vrps: set[str] | None = None previous_vaps: set[str] | None = None for run in data["runs"]: run_dir = Path(run["runDir"]) current_vrps = read_set(run_dir / "normalized-vrps.txt") current_vaps = read_set(run_dir / "normalized-vaps.txt") if previous_vrps is None: run["deltaFromPrevious"] = {"available": False} else: run["deltaFromPrevious"] = { "available": True, "vrp": set_delta(previous_vrps, current_vrps), "vap": set_delta(previous_vaps or set(), current_vaps), } previous_vrps = current_vrps previous_vaps = current_vaps def build_cross_comparison(rp_results: dict[str, Any], runs: int, root: Path) -> dict[str, Any]: pairs = [ (left, right) for left, right in [("ours-rp", "routinator"), ("ours-rp", "rpki-client"), ("routinator", "rpki-client")] if left in rp_results and right in rp_results ] by_run = [] for run_index in range(1, runs + 1): item = {"runIndex": run_index, "pairs": []} for left, right in pairs: left_dir = Path(rp_results[left]["runs"][run_index - 1]["runDir"]) right_dir = Path(rp_results[right]["runs"][run_index - 1]["runDir"]) item["pairs"].append({ "left": left, "right": right, "vrp": overlap(read_set(left_dir / "normalized-vrps.txt"), read_set(right_dir / "normalized-vrps.txt")), "vap": overlap(read_set(left_dir / "normalized-vaps.txt"), read_set(right_dir / "normalized-vaps.txt")), }) by_run.append(item) runtime = {} for rp, data in rp_results.items(): walls = [run.get("wallMs", 0) for run in data["runs"]] rss = [run.get("maxRssKb", 0) for run in data["runs"]] delta_walls = walls[1:] runtime[rp] = { "snapshotWallMs": walls[0] if walls else 0, "deltaAverageWallMs": round(sum(delta_walls) / len(delta_walls), 3) if delta_walls else 0, "deltaMinWallMs": min(delta_walls) if delta_walls else 0, "deltaMaxWallMs": max(delta_walls) if delta_walls else 0, "maxRssKb": max(rss) if rss else 0, } return {"runtimeSummary": runtime, "productOverlapByRun": by_run} def ms_to_s(value: float | int) -> str: return f"{float(value) / 1000:.3f}s" def kb_to_mb(value: float | int) -> str: return f"{float(value) / 1024:.1f} MB" def comma(value: Any) -> str: try: return f"{int(value):,}" except Exception: return str(value) def pct(value: float) -> str: return f"{value:.6f}" def h(text: Any) -> str: return html.escape(str(text), quote=True) def rp_label(rp: str) -> str: return {"ours-rp": "ours RP", "routinator": "Routinator", "rpki-client": "rpki-client"}.get(rp, rp) def min_class(value: Any, values: list[Any]) -> str: clean = [v for v in values if isinstance(v, (int, float)) and v > 0] if not clean: return "" return ' class="best"' if value == min(clean) else "" def total_wall_ms(report: dict[str, Any], rp: str) -> int: return int(sum(run.get("wallMs", 0) for run in report["rpResults"][rp]["runs"])) def total_wall_chart(report: dict[str, Any]) -> str: series = { "ours-rp": ("#2563eb", "ours-rp"), "routinator": ("#16a34a", "routinator"), "rpki-client": ("#dc2626", "rpki-client"), } totals = {rp: total_wall_ms(report, rp) for rp in report["rpResults"]} values = [value for value in totals.values() if value > 0] max_v = max(values) if values else 1 if max_v == 0: max_v = 1 left = 188 top = 46 bar_h = 28 gap = 18 width = 560 height = top + len(series) * (bar_h + gap) + 24 chunks = [ f'', f'10-run total wall clock', ] legend_x = 500 for i, (rp, (color, label)) in enumerate(series.items()): if rp not in totals: continue x = legend_x + i * 110 chunks.append(f'') chunks.append(f'{label}') for index, (rp, (color, label)) in enumerate(series.items()): if rp not in totals: continue value = totals[rp] y = top + index * (bar_h + gap) bar_w = max(8.0, width * (value / max_v)) chunks.append( f'{label}' f'' f'{ms_to_s(value)}' f'{label}: {ms_to_s(value)}' ) chunks.append("") return "\n".join(chunks) def html_summary_table(report: dict[str, Any]) -> str: runtime = report["crossRpComparison"]["runtimeSummary"] total_values = [total_wall_ms(report, rp) for rp in runtime] snapshot_values = [v["snapshotWallMs"] for v in runtime.values()] delta_avg_values = [v["deltaAverageWallMs"] for v in runtime.values()] delta_min_values = [v["deltaMinWallMs"] for v in runtime.values()] delta_max_values = [v["deltaMaxWallMs"] for v in runtime.values()] rss_values = [v["maxRssKb"] for v in runtime.values()] rows = [] for rp, values in runtime.items(): total = total_wall_ms(report, rp) rows.append( "" f"{h(rp_label(rp))}" f"{ms_to_s(total)}" f"{ms_to_s(values['snapshotWallMs'])}" f"{ms_to_s(values['deltaAverageWallMs'])}" f"{ms_to_s(values['deltaMinWallMs'])}" f"{ms_to_s(values['deltaMaxWallMs'])}" f"{kb_to_mb(values['maxRssKb'])}" "" ) return ( total_wall_chart(report) + '
' "" "" + "\n".join(rows) + "
RP10-run totalsnapshotdelta avgdelta mindelta maxmax RSS
" ) def polyline_chart(report: dict[str, Any]) -> str: series = { "ours-rp": ("#2563eb", "ours-rp"), "routinator": ("#16a34a", "routinator"), "rpki-client": ("#dc2626", "rpki-client"), } runs = report["runsPerRp"] all_values = [ run["wallMs"] / 1000 for rp in report["rpResults"].values() for run in rp["runs"] if run.get("wallMs", 0) > 0 ] min_v = 0 max_v = max(all_values) if all_values else 1 if max_v == min_v: max_v += 1 left, top, width, height = 62, 42, 690, 208 step = width / max(runs - 1, 1) def point(index: int, value_ms: int) -> tuple[float, float]: x = left + (index - 1) * step value_s = value_ms / 1000 y = top + (max_v - value_s) / (max_v - min_v) * height return x, y chunks = [ '', f'', f'', f'{max_v:.1f}s', f'{min_v:.1f}s', 'Wall clock / run', ] legend_x = 480 for i, (rp, (color, label)) in enumerate(series.items()): if rp not in report["rpResults"]: continue x = legend_x + i * 116 chunks.append(f'') chunks.append(f'{label}') for rp, (color, label) in series.items(): if rp not in report["rpResults"]: continue points = [point(run["runIndex"], run.get("wallMs", 0)) for run in report["rpResults"][rp]["runs"]] chunks.append( f'' ) chunks.append(f'') for run, (x, y) in zip(report["rpResults"][rp]["runs"], points): chunks.append( f'' f'{label} run {run["runIndex"]}: {ms_to_s(run.get("wallMs", 0))}' ) chunks.append("") if points: x, y = points[-1] chunks.append(f'{label}') for idx in range(1, runs + 1): x = left + (idx - 1) * step chunks.append(f'{idx}') chunks.append("") return "\n".join(chunks) def run_detail_table(report: dict[str, Any], rp: str) -> str: rows = [] for run in report["rpResults"][rp]["runs"]: delta = run.get("deltaFromPrevious", {}) if delta.get("available"): vrp_delta = f"+{delta['vrp']['added']}/-{delta['vrp']['removed']}" vap_delta = f"+{delta['vap']['added']}/-{delta['vap']['removed']}" else: vrp_delta = vap_delta = "—" rows.append( "" f"{run['runIndex']}{h(run['syncMode'])}{ms_to_s(run.get('wallMs', 0))}" f"{kb_to_mb(run.get('maxRssKb', 0))}{comma(run['productCounts']['vrps'])}" f"{comma(run['productCounts']['vaps'])}{vrp_delta}{vap_delta}" "" ) return ( f"

{h(rp_label(rp))}

" '
' "" "" + "\n".join(rows) + "
runmodewallmax RSSVRPsVAPsVRP deltaVAP delta
" ) def overlap_table(report: dict[str, Any], run_index: int) -> str: run = report["crossRpComparison"]["productOverlapByRun"][run_index - 1] rows = [] for pair in run["pairs"]: rows.append( "" f"{h(pair['left'])} vs {h(pair['right'])}" f"{comma(pair['vrp']['intersection'])}{comma(pair['vrp']['onlyLeft'])}" f"{comma(pair['vrp']['onlyRight'])}{pct(pair['vrp']['jaccard'])}" f"{comma(pair['vap']['intersection'])}{comma(pair['vap']['onlyLeft'])}" f"{comma(pair['vap']['onlyRight'])}{pct(pair['vap']['jaccard'])}" "" ) return ( '
' "" "" "" + "\n".join(rows) + "
pairVRP intersectiononly leftonly rightVRP JaccardVAP intersectiononly leftonly rightVAP Jaccard
" ) def publication_table(report: dict[str, Any], run_index: int) -> str: rows = [] for rp, data in report["rpResults"].items(): run = data["runs"][run_index - 1] stats = run.get("publicationStats", {}) if rp == "ours-rp": pp = f"{comma(stats.get('publicationPoints', 0))} PP processed" repo = f"sync total {ms_to_s(stats.get('repoSyncMsTotal', 0))}" source = "stage-timing.json" elif rp == "rpki-client": pp = f"{comma(stats.get('manifests', 0))} manifests / {comma(stats.get('manifestFailedParse', 0))} failed parse" repo = f"{comma(stats.get('repositories', 0))} repositories" source = "stdout.log" else: pp = "当前产物未输出" repo = "当前产物未输出" source = "jsonext 不含完整 PP/manifest state" rows.append( "" f"{h(rp_label(rp))}{h(pp)}{h(repo)}" f"{h(source)}" ) return ( '
' "" "" + "\n".join(rows) + "
RPPP/Manifest 计数Repo/Sync 信息数据来源
" ) def write_html_report(path: Path, report: dict[str, Any]) -> None: runtime = report["crossRpComparison"]["runtimeSummary"] fastest_snapshot = min(runtime.items(), key=lambda item: item[1]["snapshotWallMs"] or 10**18) fastest_delta = min(runtime.items(), key=lambda item: item[1]["deltaAverageWallMs"] or 10**18) first_overlap = report["crossRpComparison"]["productOverlapByRun"][0]["pairs"] ours_routinator_vap = next( (pair["vap"]["jaccard"] for pair in first_overlap if pair["left"] == "ours-rp" and pair["right"] == "routinator"), None, ) all_ok = all(run["exitCode"] == 0 for rp in report["rpResults"].values() for run in rp["runs"]) command = ( "python3 rpki_2/rpki/scripts/compare/run_three_rp_10run_benchmark.py " f"--run-root {path.parent} --remote-root {report['remoteRoot']} --runs {report['runsPerRp']} " f"--rirs {','.join(report['rirs'])} --rps {','.join(report['rpResults'].keys())} " f"--schedule {report.get('schedule', 'round-robin')} --ours-rsync-scope {report.get('oursRsyncScope', '')} " "--routinator-enable-aspa" ) html_text = f""" 三 RP Round-Robin 性能与产物对比报告

三 RP Round-Robin 性能与产物对比报告

all5 live 十轮对比,调度顺序为 rp1 run1 → rp2 run1 → rp3 run1 → rp1 run2 ...;ours RP 使用 {h(report.get('oursRsyncScope', ''))},Routinator 启用 --enable-aspa,rpki-client 使用 official 9.8 正确构建。

执行摘要

最快 Snapshot
{h(rp_label(fastest_snapshot[0]))}
{ms_to_s(fastest_snapshot[1]['snapshotWallMs'])}
最快 Delta 均值
{h(rp_label(fastest_delta[0]))}
{ms_to_s(fastest_delta[1]['deltaAverageWallMs'])}
ASPA 对齐
{'N/A' if ours_routinator_vap is None else f'{ours_routinator_vap:.6f}'}
ours vs Routinator VAP Jaccard
Run 状态
{'PASS' if all_ok else 'FAIL'}
{report['runsPerRp']} run × {len(report['rpResults'])} RP
调度变更:本轮使用 round-robin 串行调度,避免先跑完一个 RP 再跑另一个 RP 带来的时间窗口偏差。

实验配置

运行参数

  • 远端机器:{h(report['sshTarget'])}
  • 远端目录:{h(report['remoteRoot'])}
  • RIR:{h(','.join(report['rirs']))}
  • 调度:{h(report.get('schedule', ''))}

关键变量

  • ours RP:--rsync-scope {h(report.get('oursRsyncScope', ''))}
  • Routinator:--enable-aspa
  • rpki-client:official 9.8,本地 fixture TAL/TA
  • 首轮 snapshot,后续 delta
{h(command)}

总览:运行时间与内存

{html_summary_table(report)}

每轮 wall clock 趋势

{polyline_chart(report)}

每轮明细

{''.join(run_detail_table(report, rp) for rp in report['rpResults'])}

产物重合度

run1 snapshot

{overlap_table(report, 1)}

run{report['runsPerRp']} final delta

{overlap_table(report, report['runsPerRp'])}

发布点 / Manifest 维度

run1 snapshot

{publication_table(report, 1)}

run{report['runsPerRp']} final delta

{publication_table(report, report['runsPerRp'])}
Routinator 当前 jsonext 产物没有完整发布点或 CCR manifest state,因此发布点维度只能对 ours RP 与 rpki-client 做计数级参考,不能做全集合重合度。

验证与产物

  • JSON/XML/HTML 均由同一份 benchmark JSON 生成。
  • 本地 JSON:{h(path.parent / 'three-rp-performance-comparison.json')}
  • 本地 XML:{h(path.parent / 'three-rp-performance-comparison.xml')}
  • 远端完整产物:{h(report['remoteRoot'])}
""" path.write_text(html_text, encoding="utf-8") def write_xml(path: Path, report: dict[str, Any]) -> None: root = ET.Element("rpPerformanceComparison", { "generatedAtUtc": report["generatedAtUtc"], "remoteHost": report["sshTarget"], "remoteRoot": report["remoteRoot"], "runsPerRp": str(report["runsPerRp"]), "rirs": ",".join(report["rirs"]), "schedule": str(report.get("schedule", "")), "oursRsyncScope": str(report.get("oursRsyncScope", "")), "routinatorEnableAspa": str(report.get("routinatorEnableAspa", "")).lower(), }) env = ET.SubElement(root, "environment") ET.SubElement(env, "artifactRoot").text = str(path.parent) for rp, data in report["rpResults"].items(): rp_el = ET.SubElement(root, "rp", {"name": rp}) version = data["version"] ET.SubElement(rp_el, "version", {k: str(v) for k, v in version.items() if k in {"commit", "selectedTag", "tag", "binarySha256", "binarySize", "sourceDir", "binaryPath"}}) runs_el = ET.SubElement(rp_el, "runs") for run in data["runs"]: run_el = ET.SubElement(runs_el, "run", { "index": str(run["runIndex"]), "mode": run["syncMode"], "exitCode": str(run["exitCode"]), }) ET.SubElement(run_el, "timing", { "startUtc": run.get("startUtc", ""), "endUtc": run.get("endUtc", ""), "wallMs": str(run.get("wallMs", 0)), }) ET.SubElement(run_el, "resource", { "maxRssKb": str(run.get("maxRssKb", 0)), "userSeconds": str(run.get("userSeconds", 0.0)), "systemSeconds": str(run.get("systemSeconds", 0.0)), "cpuPercent": str(run.get("cpuPercent", "")), }) ET.SubElement(run_el, "products", { "vrps": str(run["productCounts"]["vrps"]), "vaps": str(run["productCounts"]["vaps"]), }) delta = run.get("deltaFromPrevious", {"available": False}) delta_el = ET.SubElement(run_el, "deltaFromPrevious", {"available": str(delta.get("available", False)).lower()}) if delta.get("available"): ET.SubElement(delta_el, "vrp", {k: str(v) for k, v in delta["vrp"].items()}) ET.SubElement(delta_el, "vap", {k: str(v) for k, v in delta["vap"].items()}) artifacts_el = ET.SubElement(run_el, "artifacts") for art in run.get("artifacts", []): ET.SubElement(artifacts_el, "artifact", {k: str(v) for k, v in art.items() if k != "path"}) cross_el = ET.SubElement(root, "crossRpComparison") runtime_el = ET.SubElement(cross_el, "runtimeSummary") for rp, values in report["crossRpComparison"]["runtimeSummary"].items(): ET.SubElement(runtime_el, "rp", {"name": rp, **{k: str(v) for k, v in values.items()}}) overlaps_el = ET.SubElement(cross_el, "productOverlapByRun") for run in report["crossRpComparison"]["productOverlapByRun"]: run_el = ET.SubElement(overlaps_el, "run", {"index": str(run["runIndex"])}) for pair in run["pairs"]: pair_el = ET.SubElement(run_el, "pair", {"left": pair["left"], "right": pair["right"]}) ET.SubElement(pair_el, "vrp", {k: str(v) for k, v in pair["vrp"].items()}) ET.SubElement(pair_el, "vap", {k: str(v) for k, v in pair["vap"].items()}) ET.indent(root, space=" ") tree = ET.ElementTree(root) path.parent.mkdir(parents=True, exist_ok=True) tree.write(path, encoding="utf-8", xml_declaration=True) def main() -> None: parser = argparse.ArgumentParser(description="Run serial three-RP 10-run performance comparison on a remote host.") parser.add_argument("--run-root", type=Path, required=True) parser.add_argument("--remote-root", required=True) parser.add_argument("--ssh-target", default=os.environ.get("SSH_TARGET", "root@47.251.56.108")) parser.add_argument("--runs", type=int, default=10) parser.add_argument("--rirs", default="afrinic,apnic,arin,lacnic,ripe") parser.add_argument("--skip-build", action="store_true") parser.add_argument("--skip-remote-run", action="store_true") parser.add_argument("--reuse-remote-root", action="store_true", help="Do not prepare or modify the remote root before fetching existing run artifacts.") parser.add_argument("--continue-on-failure", action="store_true") parser.add_argument("--rps", default=",".join(RP_ORDER), help=f"Comma-separated RP list. Allowed: {','.join(RP_ORDER)}") parser.add_argument("--schedule", default="round-robin", choices=["round-robin", "rp-block"], help="Execution schedule: run-index round-robin or one RP block at a time.") parser.add_argument("--ours-rsync-scope", default="publication-point", choices=sorted(RPKI_SCOPE_POLICIES)) parser.add_argument("--routinator-enable-aspa", action="store_true", help="Pass --enable-aspa to Routinator so jsonext includes ASPA/VAP data.") parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() args.run_root = args.run_root.resolve() args.run_root.mkdir(parents=True, exist_ok=True) args.active_rps = active_rps(args) rirs = parse_rirs(args.rirs) versions = resolve_versions() plan = { "runRoot": str(args.run_root), "remoteRoot": args.remote_root, "sshTarget": args.ssh_target, "runs": args.runs, "rirs": rirs, "versions": versions, "rpOrder": args.active_rps, "schedule": args.schedule, "oursRsyncScope": args.ours_rsync_scope, "routinatorEnableAspa": args.routinator_enable_aspa, } write_json(args.run_root / "dry-run-plan.json", plan) if args.dry_run: print(json.dumps(plan, indent=2, ensure_ascii=False)) return if args.reuse_remote_root: if not args.skip_remote_run: raise SystemExit("--reuse-remote-root requires --skip-remote-run") binary_meta = fetch_only_binary_metadata(args) else: binary_meta = build_release_binaries(versions, args.skip_build, args.active_rps) fixture_local = args.run_root / "fixtures" build_fixture_tree(fixture_local, rirs) prepare_remote(args, binary_meta, rirs, fixture_local) if not args.skip_remote_run: run_remote_matrix(args, rirs) fetch_remote_outputs(args) report = process_outputs(args, binary_meta, rirs) print(f"xml_report={args.run_root / 'three-rp-performance-comparison.xml'}") print(f"json_report={args.run_root / 'three-rp-performance-comparison.json'}") for rp, summary in report["crossRpComparison"]["runtimeSummary"].items(): print(f"{rp}: snapshot={summary['snapshotWallMs']}ms delta_avg={summary['deltaAverageWallMs']}ms max_rss={summary['maxRssKb']}KB") if __name__ == "__main__": main()