#!/usr/bin/env python3 from __future__ import annotations import argparse import hashlib import json import os import shlex import subprocess import sys import time from pathlib import Path from typing import Any SCRIPT_DIR = Path(__file__).resolve().parent REPO_ROOT = SCRIPT_DIR.parents[2] DEV_ROOT = REPO_ROOT.parents[1] FEATURE035_DIR = REPO_ROOT / "scripts" / "experiments" / "feature035" FIXTURE_MANIFEST_PATH = FEATURE035_DIR / "fixture-manifest.json" PORTABLE_ROOT = DEV_ROOT / "rpki-client-portable" CACHED_CIR_RPKI_CLIENT = DEV_ROOT / ".cache" / "rpki-client-9.8-cir" / "rpki-client" CACHED_CIR_LIBTLS = DEV_ROOT / ".cache" / "rpki-client-9.8-cir" / "libtls.so.28" DEFAULT_RIRS = ["afrinic", "apnic", "arin", "lacnic", "ripe"] def run_local(argv: list[str], *, cwd: Path | None = None, capture: bool = False, check: bool = True) -> subprocess.CompletedProcess[str]: result = subprocess.run(argv, cwd=str(cwd) if cwd else None, text=True, capture_output=capture, check=False) if check and result.returncode != 0: raise SystemExit( f"command failed ({result.returncode}): {' '.join(shlex.quote(x) for x in argv)}\n" f"stdout:\n{result.stdout or ''}\nstderr:\n{result.stderr or ''}" ) return result def ssh_script(target: str, script: str, *, capture: bool = False, check: bool = True) -> subprocess.CompletedProcess[str]: result = subprocess.run(["ssh", target, "bash", "-s"], input=script, text=True, capture_output=capture, check=False) if check and result.returncode != 0: raise SystemExit(f"remote script failed ({result.returncode}) on {target}\n{result.stdout}\n{result.stderr}") return result def rsync_to_remote(target: str, source: Path, destination: str | Path) -> None: run_local(["rsync", "-a", str(source), f"{target}:{destination}"]) def rsync_dir_to_remote(target: str, source: Path, destination: str | Path) -> None: run_local(["rsync", "-a", f"{source}/", f"{target}:{destination}/"]) def rsync_from_remote(target: str, source: str | Path, destination: Path) -> None: destination.mkdir(parents=True, exist_ok=True) run_local(["rsync", "-a", f"{target}:{source}/", f"{destination}/"]) def rsync_run_artifacts_from_remote(target: str, source: str | Path, destination: Path) -> None: destination.mkdir(parents=True, exist_ok=True) for name in [ "result.ccr", "result.cir", "process-time.txt", "remote-run-meta.json", "exit-code.txt", "started-at.txt", "finished-at.txt", ]: run_local(["rsync", "-a", f"{target}:{source}/{name}", f"{destination}/"]) def load_json(path: Path) -> Any: with path.open("r", encoding="utf-8") as handle: return json.load(handle) def write_json(path: Path, value: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as handle: json.dump(value, handle, indent=2, sort_keys=True, ensure_ascii=False) handle.write("\n") def append_jsonl(path: Path, value: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(value, sort_keys=True, ensure_ascii=False)) handle.write("\n") def utc_stamp() -> str: return time.strftime("%Y%m%dT%H%M%SZ", time.gmtime()) def rfc3339_now() -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) def fixture_manifest() -> dict[str, Any]: return load_json(FIXTURE_MANIFEST_PATH) def fixture_name(rir: str, kind: str) -> str: return Path(fixture_manifest()["rirs"][rir][kind]).name def fixture_desc(rir: str) -> str: return { "afrinic": "afrinic", "apnic": "apnic-rfc7730-https", "arin": "arin", "lacnic": "lacnic", "ripe": "ripe-ncc", }[rir] def cir_tal_uri_for_rir(rir: str) -> str: return { "afrinic": "https://rpki.afrinic.net/tal/afrinic.tal", "apnic": "https://rpki.apnic.net/tal/apnic-rfc7730-https.tal", "arin": "https://www.arin.net/resources/manage/rpki/arin.tal", "lacnic": "https://www.lacnic.net/innovaportal/file/4983/1/lacnic.tal", "ripe": "https://tal.rpki.ripe.net/ripe-ncc.tal", }[rir] def sha256_file(path: Path) -> str: digest = hashlib.sha256() with path.open("rb") as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b""): digest.update(chunk) return digest.hexdigest() def parse_elapsed_to_ms(raw: str) -> int: raw = raw.strip() if not raw: return 0 if "-" in raw: days, raw = raw.split("-", 1) else: days = "0" parts = raw.split(":") if len(parts) == 3: hours, minutes, seconds = parts elif len(parts) == 2: hours = "0" minutes, seconds = parts else: hours = "0" minutes = "0" seconds = parts[0] return int(round((int(days) * 86400 + int(hours) * 3600 + int(minutes) * 60 + float(seconds)) * 1000)) def parse_time_file(path: Path) -> dict[str, Any]: data: dict[str, Any] = {} if not path.is_file(): return data for line in path.read_text(encoding="utf-8", errors="replace").splitlines(): if "Elapsed (wall clock) time" in line: elapsed = line.rsplit(":", 1)[1] if "):" not in line else line.rsplit("):", 1)[1] data["wallMs"] = parse_elapsed_to_ms(elapsed) elif "Maximum resident set size" in line: try: data["maxRssKb"] = int(line.rsplit(":", 1)[1].strip()) except ValueError: pass return data def rpki_client_bin_path() -> Path: primary = PORTABLE_ROOT / "src" / "rpki-client" for candidate in (primary, CACHED_CIR_RPKI_CLIENT): if not candidate.is_file(): continue smoke = run_local([str(candidate), "-T", "invalid"], capture=True, check=False) if "--ta-fixture requires :" in (smoke.stderr + smoke.stdout): return candidate raise SystemExit("rpki-client binary lacks CIR/TA fixture support; checkout feature/cir-output-for-rp-compare or restore .cache/rpki-client-9.8-cir/rpki-client") def detect_libtls_path(rpki_client_bin: Path) -> Path: if CACHED_CIR_LIBTLS.is_file(): return CACHED_CIR_LIBTLS ldd = run_local(["ldd", str(rpki_client_bin)], capture=True) for line in ldd.stdout.splitlines(): if "libtls.so.28" not in line or "=>" not in line: continue candidate = Path(line.split("=>", 1)[1].strip().split(" ", 1)[0]) if candidate.is_file(): return candidate fallback = DEV_ROOT / ".cache" / "rpki-client-9.8-cir" / "libtls.so.28" if fallback.is_file(): return fallback raise SystemExit("unable to locate libtls.so.28 for rpki-client") def build_tool_binaries() -> None: run_local([ "cargo", "build", "--release", "--bin", "rpki", "--bin", "sequence_triage_ccr_cir", "--bin", "cir_dump_reject_list", ], cwd=REPO_ROOT) _ = rpki_client_bin_path() def validate_remote_disk(ssh_target: str) -> None: script = r''' set -euo pipefail df -h /data / || true python3 - <<'PY' import shutil for path in ['/data', '/']: try: usage = shutil.disk_usage(path) except FileNotFoundError: continue used = usage.used / usage.total if usage.total else 0 print(f'{path} used={used:.2%}') if used >= 0.90: raise SystemExit(f'{path} disk usage >= 90%; cleanup required before all5 sequence experiment') PY ''' ssh_script(ssh_target, script) def prepare_remote(ssh_target: str, remote_root: Path, needs_rpki_client: bool) -> None: validate_remote_disk(ssh_target) preflight = ( "set -euo pipefail; " "systemctl disable --now rpki-client.timer >/dev/null 2>&1 || true; " "systemctl stop rpki-client.service >/dev/null 2>&1 || true; " "pkill -x rpki-client >/dev/null 2>&1 || true; " "pkill -x routinator >/dev/null 2>&1 || true; " f"mkdir -p {shlex.quote(str(remote_root / 'bin'))} {shlex.quote(str(remote_root / 'lib'))} " f"{shlex.quote(str(remote_root / 'fixtures' / 'tal'))} {shlex.quote(str(remote_root / 'fixtures' / 'ta'))} " f"{shlex.quote(str(remote_root / 'experiments'))}; " f"df -h /data / > {shlex.quote(str(remote_root / 'df-before.txt'))} 2>&1 || true; " f"free -h > {shlex.quote(str(remote_root / 'free-before.txt'))} 2>&1 || true" ) ssh_script(ssh_target, preflight) rsync_dir_to_remote(ssh_target, REPO_ROOT / "tests" / "fixtures" / "tal", remote_root / "fixtures" / "tal") rsync_dir_to_remote(ssh_target, REPO_ROOT / "tests" / "fixtures" / "ta", remote_root / "fixtures" / "ta") rsync_to_remote(ssh_target, REPO_ROOT / "target" / "release" / "rpki", remote_root / "bin" / "rpki") rsync_to_remote(ssh_target, REPO_ROOT / "target" / "release" / "sequence_triage_ccr_cir", remote_root / "bin" / "sequence_triage_ccr_cir") rsync_to_remote(ssh_target, REPO_ROOT / "target" / "release" / "cir_dump_reject_list", remote_root / "bin" / "cir_dump_reject_list") if needs_rpki_client: rpki_client_bin = rpki_client_bin_path() rsync_to_remote(ssh_target, rpki_client_bin, remote_root / "bin" / "rpki-client") rsync_to_remote(ssh_target, detect_libtls_path(rpki_client_bin), remote_root / "lib" / "libtls.so.28") def side_config(name: str) -> dict[str, Any]: if name == "ours-standard": return {"rpKind": "ours", "mode": "standard", "protocol": "rrdp+rsync", "rsyncScope": "module-root"} if name == "rpki-client-standard": return {"rpKind": "rpki-client", "mode": "standard", "protocol": "rrdp+rsync"} raise SystemExit(f"unknown side config: {name}") def build_remote_command(remote_root: Path, side_name: str, side: dict[str, Any], side_label: str, seq: int, rirs: list[str]) -> tuple[Path, str]: run_dir = remote_root / "experiments" / "sequence" / side_label / f"run_{seq:04d}" state_dir = remote_root / "experiments" / "sequence" / side_label / "state" / side["rpKind"] sync_mode = "snapshot" if seq == 1 else "delta" ensure = [f"mkdir -p {shlex.quote(str(run_dir))}", f"chmod 0777 {shlex.quote(str(run_dir))}"] if side["rpKind"] == "ours": if seq == 1: ensure.append(f"rm -rf {shlex.quote(str(state_dir))}") ensure.extend([ f"mkdir -p {shlex.quote(str(state_dir / 'work-db'))} {shlex.quote(str(state_dir / 'rsync-mirror'))}", f"chmod -R 0777 {shlex.quote(str(state_dir.parent))}", ]) argv = [ str(remote_root / "bin" / "rpki"), "--db", str(state_dir / "work-db"), "--raw-store-db", str(state_dir / "raw-store.db"), "--repo-bytes-db", str(state_dir / "repo-bytes.db"), "--rsync-scope", side.get("rsyncScope", "module-root"), ] for rir in rirs: argv.extend(["--tal-path", str(remote_root / "fixtures" / "tal" / fixture_name(rir, "tal"))]) argv.extend(["--ta-path", str(remote_root / "fixtures" / "ta" / fixture_name(rir, "ta"))]) argv.extend(["--report-json", str(run_dir / "report.json"), "--report-json-compact"]) argv.extend(["--ccr-out", str(run_dir / "result.ccr"), "--cir-enable", "--cir-out", str(run_dir / "result.cir")]) for rir in rirs: argv.extend(["--cir-tal-uri", cir_tal_uri_for_rir(rir)]) argv.extend(["--vrps-csv-out", str(run_dir / "vrps.csv"), "--vaps-csv-out", str(run_dir / "vaps.csv")]) prefix = "env RPKI_PROGRESS_LOG=1 RPKI_PROGRESS_SLOW_SECS=10 /usr/bin/time" else: if seq == 1: ensure.append(f"rm -rf {shlex.quote(str(state_dir))}") ensure.extend([ f"mkdir -p {shlex.quote(str(state_dir / 'cache' / 'fixtures'))}", f"touch {shlex.quote(str(state_dir / 'rpki-client-skiplist'))}", f"chmod -R 0777 {shlex.quote(str(state_dir.parent))}", ]) for rir in rirs: ensure.append( f"cp -f {shlex.quote(str(remote_root / 'fixtures' / 'ta' / fixture_name(rir, 'ta')))} " f"{shlex.quote(str(state_dir / 'cache' / 'fixtures' / fixture_name(rir, 'ta')))}" ) argv = [str(remote_root / "bin" / "rpki-client"), "-vv", "-S", str(state_dir / "rpki-client-skiplist")] for rir in rirs: argv.extend(["-t", str(remote_root / "fixtures" / "tal" / fixture_name(rir, "tal"))]) argv.extend(["-T", f"{fixture_desc(rir)}:{state_dir / 'cache' / 'fixtures' / fixture_name(rir, 'ta')}"]) argv.extend(["-d", str(state_dir / "cache"), str(run_dir)]) prefix = f"env LD_LIBRARY_PATH={shlex.quote(str(remote_root / 'lib'))} /usr/bin/time" command = ( "set -euo pipefail; " + "; ".join(ensure) + "; date -u +%Y-%m-%dT%H:%M:%SZ > " + shlex.quote(str(run_dir / "started-at.txt")) + "; set +e; " + prefix + " -v -o " + shlex.quote(str(run_dir / "process-time.txt")) + " -- " + shlex.join(argv) + " > " + shlex.quote(str(run_dir / "stdout.log")) + " 2> " + shlex.quote(str(run_dir / "stderr.log")) + "; ec=$?; set -e; printf '%s\n' \"$ec\" > " + shlex.quote(str(run_dir / "exit-code.txt")) + "; date -u +%Y-%m-%dT%H:%M:%SZ > " + shlex.quote(str(run_dir / "finished-at.txt")) + "; true" ) if side["rpKind"] == "rpki-client": command += ( f"; [ -f {shlex.quote(str(run_dir / 'json'))} ] && cp -f {shlex.quote(str(run_dir / 'json'))} {shlex.quote(str(run_dir / 'report.json'))} || true" f"; [ -f {shlex.quote(str(run_dir / 'rpki.ccr'))} ] && cp -f {shlex.quote(str(run_dir / 'rpki.ccr'))} {shlex.quote(str(run_dir / 'result.ccr'))} || true" f"; [ -f {shlex.quote(str(run_dir / 'rpki.cir'))} ] && cp -f {shlex.quote(str(run_dir / 'rpki.cir'))} {shlex.quote(str(run_dir / 'result.cir'))} || true" ) command += ( f"; python3 - <<'REMOTE_META' {shlex.quote(str(run_dir))} {shlex.quote(side_name)} {shlex.quote(side_label)} {seq} {shlex.quote(sync_mode)}\n" "import json, pathlib, sys\n" "run_dir=pathlib.Path(sys.argv[1]); side_name=sys.argv[2]; side_label=sys.argv[3]; seq=int(sys.argv[4]); sync_mode=sys.argv[5]\n" "def read(p):\n return p.read_text().strip() if p.exists() else None\n" "meta={'sideName':side_name,'sideLabel':side_label,'seq':seq,'syncMode':sync_mode,'startedAt':read(run_dir/'started-at.txt'),'finishedAt':read(run_dir/'finished-at.txt'),'exitCode':int(read(run_dir/'exit-code.txt') or '1')}\n" "json.dump(meta, open(run_dir/'remote-run-meta.json','w'), indent=2, sort_keys=True); print()\n" "REMOTE_META" ) return run_dir, command def run_remote_sample(ssh_target: str, remote_root: Path, side_name: str, side: dict[str, Any], side_label: str, seq: int, rirs: list[str]) -> Path: run_dir, command = build_remote_command(remote_root, side_name, side, side_label, seq, rirs) ssh_script(ssh_target, command) return run_dir def cir_counts(cir_path: Path) -> dict[str, int]: result = run_local([str(REPO_ROOT / "target" / "release" / "cir_dump_reject_list"), "--cir", str(cir_path), "--limit", "0"], capture=True) values: dict[str, int] = {} for line in result.stdout.splitlines(): if "=" not in line: continue key, value = line.split("=", 1) if key in {"object_count", "trust_anchor_count", "reject_count"}: values[key] = int(value) return { "cirObjectCount": values.get("object_count", 0), "cirTrustAnchorCount": values.get("trust_anchor_count", 0), "cirRejectCount": values.get("reject_count", 0), } def report_counts(path: Path, rp_kind: str) -> dict[str, int]: if not path.is_file(): return {} report = load_json(path) if rp_kind == "rpki-client": meta = report.get("metadata", {}) return { "vrps": int(meta.get("vrps", 0)), "vaps": int(meta.get("vaps", 0) or meta.get("aspas", 0) or 0), "publicationPoints": int(meta.get("repositories", 0)), "warnings": 0, } pps = report.get("publication_points", []) tree = report.get("tree", {}) return { "vrps": len(report.get("vrps", [])), "vaps": len(report.get("aspas", [])), "publicationPoints": len(pps), "warnings": len(tree.get("warnings", [])) + sum(len(pp.get("warnings", [])) for pp in pps if isinstance(pp, dict)), } def build_sequence_item(local_root: Path, side_name: str, side_label: str, side: dict[str, Any], seq: int, run_dir: Path) -> dict[str, Any]: ccr = run_dir / "result.ccr" cir = run_dir / "result.cir" meta = load_json(run_dir / "remote-run-meta.json") time_info = parse_time_file(run_dir / "process-time.txt") counts = report_counts(run_dir / "report.json", side["rpKind"]) counts.update(cir_counts(cir)) return { "schemaVersion": 1, "rpId": side_name, "side": "left" if side_label == "A" else "right", "seq": seq, "runId": f"{side_label}-{seq:04d}", "syncMode": "snapshot" if seq == 1 else "delta", "status": "success" if meta.get("exitCode") == 0 else "failed", "startTime": meta.get("startedAt"), "finishTime": meta.get("finishedAt"), "validationTime": None, "ccrPath": ccr.relative_to(local_root).as_posix(), "cirPath": cir.relative_to(local_root).as_posix(), "ccrSha256": sha256_file(ccr), "cirSha256": sha256_file(cir), "wallMs": time_info.get("wallMs"), "maxRssKb": time_info.get("maxRssKb"), "vrps": counts.get("vrps"), "vaps": counts.get("vaps"), "publicationPoints": counts.get("publicationPoints"), "cirObjectCount": counts.get("cirObjectCount"), "cirRejectCount": counts.get("cirRejectCount"), "cirTrustAnchorCount": counts.get("cirTrustAnchorCount"), } def run_sequence_triage(local_exp_root: Path, args: argparse.Namespace) -> None: compare_dir = local_exp_root / "sequence-triage" run_local([ str(REPO_ROOT / "target" / "release" / "sequence_triage_ccr_cir"), "--left-sequence", str(local_exp_root / "left-sequence.jsonl"), "--right-sequence", str(local_exp_root / "right-sequence.jsonl"), "--out-dir", str(compare_dir), "--align-window-runs", str(args.align_window_runs), "--align-window-secs", str(args.align_window_secs), "--sample-limit", str(args.sample_limit), "--timeline-sample-limit", str(args.timeline_sample_limit), ]) def run_experiment(args: argparse.Namespace) -> None: if not args.skip_build: build_tool_binaries() rirs = [item.strip() for item in args.rirs.split(",") if item.strip()] if set(rirs) != set(DEFAULT_RIRS) or len(rirs) != len(DEFAULT_RIRS): raise SystemExit("#043 remote acceptance requires all5 RIRs: afrinic,apnic,arin,lacnic,ripe") left = side_config(args.left) right = side_config(args.right) run_root = Path(args.run_root).resolve() remote_root = Path(args.remote_root) run_root.mkdir(parents=True, exist_ok=True) write_json(run_root / "experiment-config.json", { "schemaVersion": 1, "generatedAtUtc": utc_stamp(), "left": args.left, "right": args.right, "samplesPerSide": args.samples_per_side, "rirs": rirs, "remoteRoot": str(remote_root), "sshTarget": args.ssh_target, }) if args.dry_run: print(json.dumps(load_json(run_root / "experiment-config.json"), indent=2, ensure_ascii=False)) return if args.triage_only: run_sequence_triage(run_root / "experiments" / "sequence", args) print(json.dumps({ "runRoot": str(run_root), "triage": str(run_root / "experiments" / "sequence" / "sequence-triage" / "sequence-triage.json"), }, indent=2)) return prepare_remote(args.ssh_target, remote_root, needs_rpki_client=(left["rpKind"] == "rpki-client" or right["rpKind"] == "rpki-client")) local_exp_root = run_root / "experiments" / "sequence" left_seq_path = local_exp_root / "left-sequence.jsonl" right_seq_path = local_exp_root / "right-sequence.jsonl" left_seq_path.unlink(missing_ok=True) right_seq_path.unlink(missing_ok=True) progress: list[dict[str, Any]] = [] for seq in range(1, args.samples_per_side + 1): for side_label, side_name, side, seq_path in [("A", args.left, left, left_seq_path), ("B", args.right, right, right_seq_path)]: print(f"[run] {side_label} {side_name} seq={seq} all5", flush=True) remote_run_dir = run_remote_sample(args.ssh_target, remote_root, side_name, side, side_label, seq, rirs) local_run_dir = local_exp_root / side_label / f"run_{seq:04d}" rsync_run_artifacts_from_remote(args.ssh_target, remote_run_dir, local_run_dir) item = build_sequence_item(local_exp_root, side_name, side_label, side, seq, local_run_dir) append_jsonl(seq_path, item) progress.append(item) print( f"[done] {side_label} seq={seq} wallMs={item.get('wallMs')} vrps={item.get('vrps')} vaps={item.get('vaps')} objects={item.get('cirObjectCount')} rejects={item.get('cirRejectCount')}", flush=True, ) write_json(local_exp_root / "run-progress.json", progress) run_sequence_triage(local_exp_root, args) ssh_script(args.ssh_target, f"df -h /data / > {shlex.quote(str(remote_root / 'df-after.txt'))} 2>&1 || true; free -h > {shlex.quote(str(remote_root / 'free-after.txt'))} 2>&1 || true") compare_dir = local_exp_root / "sequence-triage" print(json.dumps({"runRoot": str(run_root), "remoteRoot": str(remote_root), "triage": str(compare_dir / "sequence-triage.json")}, indent=2)) def main() -> None: parser = argparse.ArgumentParser(description="Feature #043 all5 sequence triage experiment driver") parser.add_argument("--run-root", required=True) parser.add_argument("--remote-root", required=True) parser.add_argument("--ssh-target", default=os.environ.get("SSH_TARGET", "root@47.251.56.108")) parser.add_argument("--left", default="ours-standard") parser.add_argument("--right", default="rpki-client-standard") parser.add_argument("--samples-per-side", type=int, default=3) parser.add_argument("--rirs", default=",".join(DEFAULT_RIRS)) parser.add_argument("--align-window-runs", type=int, default=2) parser.add_argument("--align-window-secs", type=int, default=1800) parser.add_argument("--sample-limit", type=int, default=200) parser.add_argument("--timeline-sample-limit", type=int, default=0) parser.add_argument("--dry-run", action="store_true") parser.add_argument("--skip-build", action="store_true", help="reuse existing release binaries") parser.add_argument("--triage-only", action="store_true", help="only rerun local sequence triage for an existing run root") args = parser.parse_args() if args.samples_per_side < 2: raise SystemExit("--samples-per-side must be >= 2") run_experiment(args) if __name__ == "__main__": main()