#!/usr/bin/env python3 from __future__ import annotations import argparse from concurrent.futures import ThreadPoolExecutor, as_completed import hashlib import json import os import shlex import subprocess import sys import time from pathlib import Path from typing import Any SCRIPT_DIR = Path(__file__).resolve().parent REPO_ROOT = SCRIPT_DIR.parents[2] DEV_ROOT = REPO_ROOT.parents[1] FEATURE035_DIR = REPO_ROOT / "scripts" / "experiments" / "feature035" FIXTURE_MANIFEST_PATH = FEATURE035_DIR / "fixture-manifest.json" PORTABLE_ROOT = DEV_ROOT / "rpki-client-portable" CACHED_CIR_RPKI_CLIENT = DEV_ROOT / ".cache" / "rpki-client-9.8-cir" / "rpki-client" CACHED_CIR_LIBTLS = DEV_ROOT / ".cache" / "rpki-client-9.8-cir" / "libtls.so.28" DEFAULT_RIRS = ["afrinic", "apnic", "arin", "lacnic", "ripe"] def run_local(argv: list[str], *, cwd: Path | None = None, capture: bool = False, check: bool = True) -> subprocess.CompletedProcess[str]: result = subprocess.run(argv, cwd=str(cwd) if cwd else None, text=True, capture_output=capture, check=False) if check and result.returncode != 0: raise SystemExit( f"command failed ({result.returncode}): {' '.join(shlex.quote(x) for x in argv)}\n" f"stdout:\n{result.stdout or ''}\nstderr:\n{result.stderr or ''}" ) return result def ssh_script(target: str, script: str, *, capture: bool = False, check: bool = True) -> subprocess.CompletedProcess[str]: result = subprocess.run(["ssh", target, "bash", "-s"], input=script, text=True, capture_output=capture, check=False) if check and result.returncode != 0: raise SystemExit(f"remote script failed ({result.returncode}) on {target}\n{result.stdout}\n{result.stderr}") return result def rsync_to_remote(target: str, source: Path, destination: str | Path) -> None: run_local(["rsync", "-a", str(source), f"{target}:{destination}"]) def rsync_dir_to_remote(target: str, source: Path, destination: str | Path) -> None: run_local(["rsync", "-a", f"{source}/", f"{target}:{destination}/"]) def rsync_from_remote(target: str, source: str | Path, destination: Path) -> None: destination.mkdir(parents=True, exist_ok=True) run_local(["rsync", "-a", f"{target}:{source}/", f"{destination}/"]) def rsync_run_artifacts_from_remote(target: str, source: str | Path, destination: Path) -> None: destination.mkdir(parents=True, exist_ok=True) rsync_base = ["rsync", "-az", "--ignore-missing-args", "--partial", "--partial-dir=.rsync-partial"] for name in [ "result.ccr", "result.cir", "report.json", "vrps.csv", "vaps.csv", "process-time.txt", "remote-run-meta.json", "exit-code.txt", "started-at.txt", "finished-at.txt", "stdout.log", "stderr.log", ]: run_local([*rsync_base, f"{target}:{source}/{name}", f"{destination}/"]) def rsync_remote_analysis_from_remote(target: str, remote_exp_root: str | Path, local_exp_root: Path) -> None: local_exp_root.mkdir(parents=True, exist_ok=True) rsync_base = ["rsync", "-az", "--ignore-missing-args", "--partial", "--partial-dir=.rsync-partial"] for name in [ "left-sequence.jsonl", "right-sequence.jsonl", "run-progress.json", "sequence-triage-time.txt", "sequence-triage/sequence-triage.json", ]: destination = local_exp_root / Path(name).parent destination.mkdir(parents=True, exist_ok=True) run_local([*rsync_base, f"{target}:{remote_exp_root}/{name}", f"{destination}/"]) def same_remote_location( left_target: str, left_root: str | Path, right_target: str, right_root: str | Path, ) -> bool: return left_target == right_target and str(left_root) == str(right_root) def load_json(path: Path) -> Any: with path.open("r", encoding="utf-8") as handle: return json.load(handle) def write_json(path: Path, value: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as handle: json.dump(value, handle, indent=2, sort_keys=True, ensure_ascii=False) handle.write("\n") def append_jsonl(path: Path, value: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(value, sort_keys=True, ensure_ascii=False)) handle.write("\n") def utc_stamp() -> str: return time.strftime("%Y%m%dT%H%M%SZ", time.gmtime()) def rfc3339_now() -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) def fixture_manifest() -> dict[str, Any]: return load_json(FIXTURE_MANIFEST_PATH) def fixture_name(rir: str, kind: str) -> str: return Path(fixture_manifest()["rirs"][rir][kind]).name def fixture_desc(rir: str) -> str: return { "afrinic": "afrinic", "apnic": "apnic-rfc7730-https", "arin": "arin", "lacnic": "lacnic", "ripe": "ripe-ncc", }[rir] def cir_tal_uri_for_rir(rir: str) -> str: return { "afrinic": "https://rpki.afrinic.net/tal/afrinic.tal", "apnic": "https://rpki.apnic.net/tal/apnic-rfc7730-https.tal", "arin": "https://www.arin.net/resources/manage/rpki/arin.tal", "lacnic": "https://www.lacnic.net/innovaportal/file/4983/1/lacnic.tal", "ripe": "https://tal.rpki.ripe.net/ripe-ncc.tal", }[rir] def sha256_file(path: Path) -> str: digest = hashlib.sha256() with path.open("rb") as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b""): digest.update(chunk) return digest.hexdigest() def parse_elapsed_to_ms(raw: str) -> int: raw = raw.strip() if not raw: return 0 if "-" in raw: days, raw = raw.split("-", 1) else: days = "0" parts = raw.split(":") if len(parts) == 3: hours, minutes, seconds = parts elif len(parts) == 2: hours = "0" minutes, seconds = parts else: hours = "0" minutes = "0" seconds = parts[0] return int(round((int(days) * 86400 + int(hours) * 3600 + int(minutes) * 60 + float(seconds)) * 1000)) def parse_time_file(path: Path) -> dict[str, Any]: data: dict[str, Any] = {} if not path.is_file(): return data for line in path.read_text(encoding="utf-8", errors="replace").splitlines(): if "Elapsed (wall clock) time" in line: elapsed = line.rsplit(":", 1)[1] if "):" not in line else line.rsplit("):", 1)[1] data["wallMs"] = parse_elapsed_to_ms(elapsed) elif "Maximum resident set size" in line: try: data["maxRssKb"] = int(line.rsplit(":", 1)[1].strip()) except ValueError: pass return data def rpki_client_bin_path() -> Path: primary = PORTABLE_ROOT / "src" / "rpki-client" for candidate in (primary, CACHED_CIR_RPKI_CLIENT): if not candidate.is_file(): continue smoke = run_local([str(candidate), "-T", "invalid"], capture=True, check=False) if "--ta-fixture requires :" in (smoke.stderr + smoke.stdout): return candidate raise SystemExit("rpki-client binary lacks CIR/TA fixture support; checkout feature/cir-output-for-rp-compare or restore .cache/rpki-client-9.8-cir/rpki-client") def detect_libtls_path(rpki_client_bin: Path) -> Path: if CACHED_CIR_LIBTLS.is_file(): return CACHED_CIR_LIBTLS ldd = run_local(["ldd", str(rpki_client_bin)], capture=True) for line in ldd.stdout.splitlines(): if "libtls.so.28" not in line or "=>" not in line: continue candidate = Path(line.split("=>", 1)[1].strip().split(" ", 1)[0]) if candidate.is_file(): return candidate fallback = DEV_ROOT / ".cache" / "rpki-client-9.8-cir" / "libtls.so.28" if fallback.is_file(): return fallback raise SystemExit("unable to locate libtls.so.28 for rpki-client") def build_tool_binaries() -> None: run_local([ "cargo", "build", "--release", "--bin", "rpki", "--bin", "sequence_triage_ccr_cir", "--bin", "cir_dump_reject_list", ], cwd=REPO_ROOT) _ = rpki_client_bin_path() def validate_remote_disk(ssh_target: str) -> None: script = r''' set -euo pipefail df -h /data / || true python3 - <<'PY' import shutil for path in ['/data', '/']: try: usage = shutil.disk_usage(path) except FileNotFoundError: continue used = usage.used / usage.total if usage.total else 0 print(f'{path} used={used:.2%}') if used >= 0.90: raise SystemExit(f'{path} disk usage >= 90%; cleanup required before all5 sequence experiment') PY ''' ssh_script(ssh_target, script) def prepare_remote(ssh_target: str, remote_root: Path, needs_rpki_client: bool) -> None: validate_remote_disk(ssh_target) preflight = ( "set -euo pipefail; " "systemctl disable --now rpki-client.timer >/dev/null 2>&1 || true; " "systemctl stop rpki-client.service >/dev/null 2>&1 || true; " "pkill -x rpki-client >/dev/null 2>&1 || true; " "pkill -x routinator >/dev/null 2>&1 || true; " f"mkdir -p {shlex.quote(str(remote_root / 'bin'))} {shlex.quote(str(remote_root / 'lib'))} " f"{shlex.quote(str(remote_root / 'fixtures' / 'tal'))} {shlex.quote(str(remote_root / 'fixtures' / 'ta'))} " f"{shlex.quote(str(remote_root / 'experiments'))}; " f"df -h /data / > {shlex.quote(str(remote_root / 'df-before.txt'))} 2>&1 || true; " f"free -h > {shlex.quote(str(remote_root / 'free-before.txt'))} 2>&1 || true" ) ssh_script(ssh_target, preflight) rsync_dir_to_remote(ssh_target, REPO_ROOT / "tests" / "fixtures" / "tal", remote_root / "fixtures" / "tal") rsync_dir_to_remote(ssh_target, REPO_ROOT / "tests" / "fixtures" / "ta", remote_root / "fixtures" / "ta") rsync_to_remote(ssh_target, REPO_ROOT / "target" / "release" / "rpki", remote_root / "bin" / "rpki") rsync_to_remote(ssh_target, REPO_ROOT / "target" / "release" / "sequence_triage_ccr_cir", remote_root / "bin" / "sequence_triage_ccr_cir") rsync_to_remote(ssh_target, REPO_ROOT / "target" / "release" / "cir_dump_reject_list", remote_root / "bin" / "cir_dump_reject_list") if needs_rpki_client: rpki_client_bin = rpki_client_bin_path() rsync_to_remote(ssh_target, rpki_client_bin, remote_root / "bin" / "rpki-client") rsync_to_remote(ssh_target, detect_libtls_path(rpki_client_bin), remote_root / "lib" / "libtls.so.28") def prepare_remote_once( prepared: dict[tuple[str, str], bool], ssh_target: str, remote_root: Path, needs_rpki_client: bool, ) -> None: key = (ssh_target, str(remote_root)) already_has_rpki_client = prepared.get(key) if already_has_rpki_client is not None and (already_has_rpki_client or not needs_rpki_client): return prepare_remote(ssh_target, remote_root, needs_rpki_client) prepared[key] = bool(already_has_rpki_client or needs_rpki_client) def side_config(name: str) -> dict[str, Any]: if name == "ours-standard": return {"rpKind": "ours", "mode": "standard", "protocol": "rrdp+rsync", "rsyncScope": "module-root"} if name == "ours-strict-all": return { "rpKind": "ours", "mode": "strict-all", "protocol": "rrdp+rsync", "rsyncScope": "module-root", "strictPolicies": "name,cms-der,signed-attrs", } if name == "rpki-client-standard": return {"rpKind": "rpki-client", "mode": "standard", "protocol": "rrdp+rsync"} raise SystemExit(f"unknown side config: {name}") def parse_rirs(raw: str) -> list[str]: rirs = [item.strip() for item in raw.split(",") if item.strip()] if not rirs: raise SystemExit("--rirs must contain at least one RIR") seen: set[str] = set() invalid: list[str] = [] duplicate: list[str] = [] for rir in rirs: if rir not in DEFAULT_RIRS: invalid.append(rir) if rir in seen: duplicate.append(rir) seen.add(rir) if invalid: raise SystemExit(f"unsupported RIR(s): {','.join(invalid)}; valid values: {','.join(DEFAULT_RIRS)}") if duplicate: raise SystemExit(f"duplicate RIR(s): {','.join(duplicate)}") return rirs def build_remote_command(remote_root: Path, side_name: str, side: dict[str, Any], side_label: str, seq: int, rirs: list[str]) -> tuple[Path, str]: run_dir = remote_root / "experiments" / "sequence" / side_label / f"run_{seq:04d}" state_dir = remote_root / "experiments" / "sequence" / side_label / "state" / side["rpKind"] sync_mode = "snapshot" if seq == 1 else "delta" ensure = [f"mkdir -p {shlex.quote(str(run_dir))}", f"chmod 0777 {shlex.quote(str(run_dir))}"] if side["rpKind"] == "ours": if seq == 1: ensure.append(f"rm -rf {shlex.quote(str(state_dir))}") ensure.extend([ f"mkdir -p {shlex.quote(str(state_dir / 'work-db'))} {shlex.quote(str(state_dir / 'rsync-mirror'))}", f"chmod -R 0777 {shlex.quote(str(state_dir.parent))}", ]) argv = [ str(remote_root / "bin" / "rpki"), "--db", str(state_dir / "work-db"), "--raw-store-db", str(state_dir / "raw-store.db"), "--repo-bytes-db", str(state_dir / "repo-bytes.db"), "--rsync-scope", side.get("rsyncScope", "module-root"), ] if side.get("strictPolicies"): argv.extend(["--strict", str(side["strictPolicies"])]) for rir in rirs: argv.extend(["--tal-path", str(remote_root / "fixtures" / "tal" / fixture_name(rir, "tal"))]) argv.extend(["--ta-path", str(remote_root / "fixtures" / "ta" / fixture_name(rir, "ta"))]) argv.extend(["--report-json", str(run_dir / "report.json"), "--report-json-compact"]) argv.extend(["--ccr-out", str(run_dir / "result.ccr"), "--cir-enable", "--cir-out", str(run_dir / "result.cir")]) for rir in rirs: argv.extend(["--cir-tal-uri", cir_tal_uri_for_rir(rir)]) argv.extend(["--vrps-csv-out", str(run_dir / "vrps.csv"), "--vaps-csv-out", str(run_dir / "vaps.csv")]) prefix = "env RPKI_PROGRESS_LOG=1 RPKI_PROGRESS_SLOW_SECS=10 /usr/bin/time" else: if seq == 1: ensure.append(f"rm -rf {shlex.quote(str(state_dir))}") ensure.extend([ f"mkdir -p {shlex.quote(str(state_dir / 'cache' / 'fixtures'))}", f"touch {shlex.quote(str(state_dir / 'rpki-client-skiplist'))}", f"chmod -R 0777 {shlex.quote(str(state_dir.parent))}", ]) for rir in rirs: ensure.append( f"cp -f {shlex.quote(str(remote_root / 'fixtures' / 'ta' / fixture_name(rir, 'ta')))} " f"{shlex.quote(str(state_dir / 'cache' / 'fixtures' / fixture_name(rir, 'ta')))}" ) argv = [str(remote_root / "bin" / "rpki-client"), "-vv", "-S", str(state_dir / "rpki-client-skiplist")] for rir in rirs: argv.extend(["-t", str(remote_root / "fixtures" / "tal" / fixture_name(rir, "tal"))]) argv.extend(["-T", f"{fixture_desc(rir)}:{state_dir / 'cache' / 'fixtures' / fixture_name(rir, 'ta')}"]) argv.extend(["-d", str(state_dir / "cache"), str(run_dir)]) prefix = f"env LD_LIBRARY_PATH={shlex.quote(str(remote_root / 'lib'))} /usr/bin/time" command = ( "set -euo pipefail; " + "; ".join(ensure) + "; date -u +%Y-%m-%dT%H:%M:%SZ > " + shlex.quote(str(run_dir / "started-at.txt")) + "; set +e; " + prefix + " -v -o " + shlex.quote(str(run_dir / "process-time.txt")) + " -- " + shlex.join(argv) + " > " + shlex.quote(str(run_dir / "stdout.log")) + " 2> " + shlex.quote(str(run_dir / "stderr.log")) + "; ec=$?; set -e; printf '%s\n' \"$ec\" > " + shlex.quote(str(run_dir / "exit-code.txt")) + "; date -u +%Y-%m-%dT%H:%M:%SZ > " + shlex.quote(str(run_dir / "finished-at.txt")) + "; true" ) if side["rpKind"] == "rpki-client": command += ( f"; [ -f {shlex.quote(str(run_dir / 'json'))} ] && cp -f {shlex.quote(str(run_dir / 'json'))} {shlex.quote(str(run_dir / 'report.json'))} || true" f"; [ -f {shlex.quote(str(run_dir / 'rpki.ccr'))} ] && cp -f {shlex.quote(str(run_dir / 'rpki.ccr'))} {shlex.quote(str(run_dir / 'result.ccr'))} || true" f"; [ -f {shlex.quote(str(run_dir / 'rpki.cir'))} ] && cp -f {shlex.quote(str(run_dir / 'rpki.cir'))} {shlex.quote(str(run_dir / 'result.cir'))} || true" ) command += ( f"; python3 - <<'REMOTE_META' {shlex.quote(str(run_dir))} {shlex.quote(side_name)} {shlex.quote(side_label)} {seq} {shlex.quote(sync_mode)}\n" "import json, pathlib, sys\n" "run_dir=pathlib.Path(sys.argv[1]); side_name=sys.argv[2]; side_label=sys.argv[3]; seq=int(sys.argv[4]); sync_mode=sys.argv[5]\n" "def read(p):\n return p.read_text().strip() if p.exists() else None\n" "def counts_from_report():\n" " p=run_dir/'report.json'\n" " if not p.exists():\n" " return {}\n" " try:\n" " report=json.load(open(p))\n" " except Exception:\n" " return {}\n" " meta=report.get('metadata') if isinstance(report, dict) else None\n" " if isinstance(meta, dict):\n" " return {'vrps': int(meta.get('vrps') or 0), 'vaps': int(meta.get('vaps') or meta.get('aspas') or 0), 'publicationPoints': int(meta.get('repositories') or 0), 'warnings': 0}\n" " pps=report.get('publication_points', []) if isinstance(report, dict) else []\n" " tree=report.get('tree', {}) if isinstance(report, dict) else {}\n" " pp_warnings=sum(len(pp.get('warnings', [])) for pp in pps if isinstance(pp, dict)) if isinstance(pps, list) else 0\n" " return {'vrps': len(report.get('vrps', [])), 'vaps': len(report.get('aspas', [])), 'publicationPoints': len(pps) if isinstance(pps, list) else 0, 'warnings': len(tree.get('warnings', [])) + pp_warnings if isinstance(tree, dict) else pp_warnings}\n" "meta={'sideName':side_name,'sideLabel':side_label,'seq':seq,'syncMode':sync_mode,'startedAt':read(run_dir/'started-at.txt'),'finishedAt':read(run_dir/'finished-at.txt'),'exitCode':int(read(run_dir/'exit-code.txt') or '1'),'counts':counts_from_report()}\n" "json.dump(meta, open(run_dir/'remote-run-meta.json','w'), indent=2, sort_keys=True); print()\n" "REMOTE_META" ) return run_dir, command def run_remote_sample(ssh_target: str, remote_root: Path, side_name: str, side: dict[str, Any], side_label: str, seq: int, rirs: list[str]) -> Path: run_dir, command = build_remote_command(remote_root, side_name, side, side_label, seq, rirs) ssh_script(ssh_target, command) return run_dir def append_remote_sequence_item( ssh_target: str, remote_root: Path, side_name: str, side_label: str, seq: int, run_dir: Path, schedule_mode: str, ) -> dict[str, Any]: remote_exp_root = remote_root / "experiments" / "sequence" seq_path = remote_exp_root / ("left-sequence.jsonl" if side_label == "A" else "right-sequence.jsonl") side_value = "left" if side_label == "A" else "right" script = f""" set -euo pipefail python3 - <<'REMOTE_SEQUENCE_ITEM' {shlex.quote(str(remote_exp_root))} {shlex.quote(str(seq_path))} {shlex.quote(str(run_dir))} {shlex.quote(str(remote_root / 'bin' / 'cir_dump_reject_list'))} {shlex.quote(side_name)} {shlex.quote(side_label)} {seq} {shlex.quote(side_value)} {shlex.quote(schedule_mode)} import hashlib, json, os, pathlib, subprocess, sys exp_root=pathlib.Path(sys.argv[1]) seq_path=pathlib.Path(sys.argv[2]) run_dir=pathlib.Path(sys.argv[3]) cir_dump=pathlib.Path(sys.argv[4]) side_name=sys.argv[5] side_label=sys.argv[6] seq=int(sys.argv[7]) side_value=sys.argv[8] schedule_mode=sys.argv[9] def read(p): return p.read_text().strip() if p.exists() else None def sha256_file(p): h=hashlib.sha256() with open(p, 'rb') as f: for chunk in iter(lambda: f.read(1024 * 1024), b''): h.update(chunk) return h.hexdigest() def parse_elapsed_to_ms(raw): raw=raw.strip() if not raw: return 0 if '-' in raw: days, raw=raw.split('-', 1) else: days='0' parts=raw.split(':') if len(parts) == 3: hours, minutes, seconds=parts elif len(parts) == 2: hours='0'; minutes, seconds=parts else: hours='0'; minutes='0'; seconds=parts[0] return int(round((int(days)*86400 + int(hours)*3600 + int(minutes)*60 + float(seconds))*1000)) def parse_time_file(p): data={{}} if not p.exists(): return data for line in p.read_text(errors='replace').splitlines(): if 'Elapsed (wall clock) time' in line: elapsed=line.rsplit('):', 1)[1] if '):' in line else line.rsplit(':', 1)[1] data['wallMs']=parse_elapsed_to_ms(elapsed) elif 'Maximum resident set size' in line: try: data['maxRssKb']=int(line.rsplit(':', 1)[1].strip()) except ValueError: pass return data def cir_counts(cir): values={{}} if not cir.exists(): return {{}} result=subprocess.run([str(cir_dump), '--cir', str(cir), '--limit', '0'], text=True, capture_output=True, check=True) for line in result.stdout.splitlines(): if '=' not in line: continue key, value=line.split('=', 1) if key in ('object_count', 'trust_anchor_count', 'reject_count'): values[key]=int(value) return {{'cirObjectCount': values.get('object_count', 0), 'cirTrustAnchorCount': values.get('trust_anchor_count', 0), 'cirRejectCount': values.get('reject_count', 0)}} meta=json.load(open(run_dir/'remote-run-meta.json')) time_info=parse_time_file(run_dir/'process-time.txt') counts=dict(meta.get('counts') or {{}}) ccr=run_dir/'result.ccr' cir=run_dir/'result.cir' if meta.get('exitCode') != 0: raise SystemExit(f"remote run failed: exitCode={{meta.get('exitCode')}} runDir={{run_dir}}") missing=[str(path) for path in (ccr, cir) if not path.exists()] if missing: raise SystemExit(f"remote run missing required artifact(s): {{missing}}") counts.update(cir_counts(cir)) item={{ 'schemaVersion': 1, 'rpId': side_name, 'side': side_value, 'seq': seq, 'runId': f'{{side_label}}-{{seq:04d}}', 'syncMode': 'snapshot' if seq == 1 else 'delta', 'status': 'success' if meta.get('exitCode') == 0 else 'failed', 'startTime': meta.get('startedAt'), 'finishTime': meta.get('finishedAt'), 'validationTime': None, 'ccrPath': os.path.relpath(ccr, exp_root), 'cirPath': os.path.relpath(cir, exp_root), 'ccrSha256': sha256_file(ccr) if ccr.exists() else None, 'cirSha256': sha256_file(cir) if cir.exists() else None, 'wallMs': time_info.get('wallMs'), 'maxRssKb': time_info.get('maxRssKb'), 'vrps': counts.get('vrps'), 'vaps': counts.get('vaps'), 'publicationPoints': counts.get('publicationPoints'), 'cirObjectCount': counts.get('cirObjectCount'), 'cirRejectCount': counts.get('cirRejectCount'), 'cirTrustAnchorCount': counts.get('cirTrustAnchorCount'), 'scheduleMode': schedule_mode, }} seq_path.parent.mkdir(parents=True, exist_ok=True) with open(seq_path, 'a', encoding='utf-8') as handle: handle.write(json.dumps(item, sort_keys=True, ensure_ascii=False) + '\\n') print(json.dumps(item, sort_keys=True, ensure_ascii=False)) REMOTE_SEQUENCE_ITEM """ result = ssh_script(ssh_target, script, capture=True) lines = [line for line in result.stdout.splitlines() if line.strip()] if not lines: raise SystemExit("remote sequence item append produced no output") return json.loads(lines[-1]) def cleanup_remote_run_nonessential(ssh_target: str, run_dir: Path) -> None: keep = { "result.ccr", "result.cir", "process-time.txt", "remote-run-meta.json", "exit-code.txt", "started-at.txt", "finished-at.txt", "stdout.log", "stderr.log", } keep_json = json.dumps(sorted(keep), ensure_ascii=False) script = f""" set -euo pipefail python3 - <<'REMOTE_CLEAN' {shlex.quote(str(run_dir))} {shlex.quote(keep_json)} import json, pathlib, sys run_dir=pathlib.Path(sys.argv[1]) keep=set(json.loads(sys.argv[2])) removed=0 if run_dir.exists(): for path in run_dir.iterdir(): if path.name in keep or path.is_dir(): continue try: removed += path.stat().st_size path.unlink() except FileNotFoundError: pass print(f'cleaned_nonessential_bytes={{removed}}') REMOTE_CLEAN """ ssh_script(ssh_target, script) def cir_counts(cir_path: Path) -> dict[str, int]: result = run_local([str(REPO_ROOT / "target" / "release" / "cir_dump_reject_list"), "--cir", str(cir_path), "--limit", "0"], capture=True) values: dict[str, int] = {} for line in result.stdout.splitlines(): if "=" not in line: continue key, value = line.split("=", 1) if key in {"object_count", "trust_anchor_count", "reject_count"}: values[key] = int(value) return { "cirObjectCount": values.get("object_count", 0), "cirTrustAnchorCount": values.get("trust_anchor_count", 0), "cirRejectCount": values.get("reject_count", 0), } def report_counts(path: Path, rp_kind: str) -> dict[str, int]: if not path.is_file(): return {} report = load_json(path) if rp_kind == "rpki-client": meta = report.get("metadata", {}) return { "vrps": int(meta.get("vrps", 0)), "vaps": int(meta.get("vaps", 0) or meta.get("aspas", 0) or 0), "publicationPoints": int(meta.get("repositories", 0)), "warnings": 0, } pps = report.get("publication_points", []) tree = report.get("tree", {}) return { "vrps": len(report.get("vrps", [])), "vaps": len(report.get("aspas", [])), "publicationPoints": len(pps), "warnings": len(tree.get("warnings", [])) + sum(len(pp.get("warnings", [])) for pp in pps if isinstance(pp, dict)), } def build_sequence_item(local_root: Path, side_name: str, side_label: str, side: dict[str, Any], seq: int, run_dir: Path) -> dict[str, Any]: ccr = run_dir / "result.ccr" cir = run_dir / "result.cir" meta = load_json(run_dir / "remote-run-meta.json") time_info = parse_time_file(run_dir / "process-time.txt") counts = dict(meta.get("counts") or {}) if not counts: counts = report_counts(run_dir / "report.json", side["rpKind"]) counts.update(cir_counts(cir)) return { "schemaVersion": 1, "rpId": side_name, "side": "left" if side_label == "A" else "right", "seq": seq, "runId": f"{side_label}-{seq:04d}", "syncMode": "snapshot" if seq == 1 else "delta", "status": "success" if meta.get("exitCode") == 0 else "failed", "startTime": meta.get("startedAt"), "finishTime": meta.get("finishedAt"), "validationTime": None, "ccrPath": ccr.relative_to(local_root).as_posix(), "cirPath": cir.relative_to(local_root).as_posix(), "ccrSha256": sha256_file(ccr), "cirSha256": sha256_file(cir), "wallMs": time_info.get("wallMs"), "maxRssKb": time_info.get("maxRssKb"), "vrps": counts.get("vrps"), "vaps": counts.get("vaps"), "publicationPoints": counts.get("publicationPoints"), "cirObjectCount": counts.get("cirObjectCount"), "cirRejectCount": counts.get("cirRejectCount"), "cirTrustAnchorCount": counts.get("cirTrustAnchorCount"), } def run_sequence_triage(local_exp_root: Path, args: argparse.Namespace) -> None: compare_dir = local_exp_root / "sequence-triage" run_local([ str(REPO_ROOT / "target" / "release" / "sequence_triage_ccr_cir"), "--left-sequence", str(local_exp_root / "left-sequence.jsonl"), "--right-sequence", str(local_exp_root / "right-sequence.jsonl"), "--out-dir", str(compare_dir), "--align-window-runs", str(args.align_window_runs), "--align-window-secs", str(args.align_window_secs), "--sample-limit", str(args.sample_limit), "--timeline-sample-limit", str(args.timeline_sample_limit), ]) def run_sequence_triage_remote(ssh_target: str, remote_root: Path, args: argparse.Namespace) -> None: remote_exp_root = remote_root / "experiments" / "sequence" compare_dir = remote_exp_root / "sequence-triage" time_path = remote_exp_root / "sequence-triage-time.txt" command = " ".join( shlex.quote(item) for item in [ str(remote_root / "bin" / "sequence_triage_ccr_cir"), "--left-sequence", str(remote_exp_root / "left-sequence.jsonl"), "--right-sequence", str(remote_exp_root / "right-sequence.jsonl"), "--out-dir", str(compare_dir), "--align-window-runs", str(args.align_window_runs), "--align-window-secs", str(args.align_window_secs), "--sample-limit", str(args.sample_limit), "--timeline-sample-limit", str(args.timeline_sample_limit), ] ) ssh_script( ssh_target, "set -euo pipefail; " f"rm -rf {shlex.quote(str(compare_dir))} {shlex.quote(str(time_path))}; " f"/usr/bin/time -v -o {shlex.quote(str(time_path))} -- {command}", ) def sync_side_to_analysis_remote( source_ssh_target: str, source_remote_root: Path, analysis_ssh_target: str, analysis_remote_root: Path, side_label: str, ) -> None: source_exp_root = source_remote_root / "experiments" / "sequence" analysis_exp_root = analysis_remote_root / "experiments" / "sequence" sequence_name = "left-sequence.jsonl" if side_label == "A" else "right-sequence.jsonl" if same_remote_location(source_ssh_target, source_exp_root, analysis_ssh_target, analysis_exp_root): return side_dir = source_exp_root / side_label script = ( "set -euo pipefail; " f"ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new {shlex.quote(analysis_ssh_target)} " f"{shlex.quote('mkdir -p ' + shlex.quote(str(analysis_exp_root / side_label)))}; " f"rsync -az --delete {shlex.quote(str(side_dir))}/ {shlex.quote(analysis_ssh_target)}:{shlex.quote(str(analysis_exp_root / side_label))}/; " f"rsync -az {shlex.quote(str(source_exp_root / sequence_name))} " f"{shlex.quote(analysis_ssh_target)}:{shlex.quote(str(analysis_exp_root / sequence_name))}" ) ssh_script(source_ssh_target, script) def run_side_sequence( args: argparse.Namespace, ssh_target: str, remote_root: Path, local_exp_root: Path, side_label: str, side_name: str, side: dict[str, Any], seq_path: Path, rirs: list[str], ) -> list[dict[str, Any]]: side_progress: list[dict[str, Any]] = [] for seq in range(1, args.samples_per_side + 1): side_progress.append( run_one_side_sample(args, ssh_target, remote_root, local_exp_root, side_label, side_name, side, seq_path, seq, rirs) ) return side_progress def run_one_side_sample( args: argparse.Namespace, ssh_target: str, remote_root: Path, local_exp_root: Path, side_label: str, side_name: str, side: dict[str, Any], seq_path: Path, seq: int, rirs: list[str], ) -> dict[str, Any]: rir_label = ",".join(rirs) print( f"[run] {side_label} {side_name} seq={seq} rirs={rir_label} schedule={args.schedule_mode}", flush=True, ) remote_run_dir = run_remote_sample(ssh_target, remote_root, side_name, side, side_label, seq, rirs) if args.remote_triage: item = append_remote_sequence_item( ssh_target, remote_root, side_name, side_label, seq, remote_run_dir, args.schedule_mode, ) if args.cleanup_run_nonessential: cleanup_remote_run_nonessential(ssh_target, remote_run_dir) else: local_run_dir = local_exp_root / side_label / f"run_{seq:04d}" rsync_run_artifacts_from_remote(ssh_target, remote_run_dir, local_run_dir) item = build_sequence_item(local_exp_root, side_name, side_label, side, seq, local_run_dir) item["scheduleMode"] = args.schedule_mode append_jsonl(seq_path, item) print( f"[done] {side_label} seq={seq} wallMs={item.get('wallMs')} vrps={item.get('vrps')} vaps={item.get('vaps')} objects={item.get('cirObjectCount')} rejects={item.get('cirRejectCount')}", flush=True, ) return item def run_experiment(args: argparse.Namespace) -> None: if not args.skip_build: build_tool_binaries() rirs = parse_rirs(args.rirs) left = side_config(args.left) right = side_config(args.right) run_root = Path(args.run_root).resolve() remote_root = Path(args.remote_root) left_ssh_target = args.left_ssh_target or args.ssh_target right_ssh_target = args.right_ssh_target or args.ssh_target analysis_ssh_target = args.analysis_ssh_target or left_ssh_target left_remote_root = Path(args.left_remote_root or args.remote_root) right_remote_root = Path(args.right_remote_root or args.remote_root) analysis_remote_root = Path(args.analysis_remote_root or args.remote_root) run_root.mkdir(parents=True, exist_ok=True) write_json(run_root / "experiment-config.json", { "schemaVersion": 1, "generatedAtUtc": utc_stamp(), "left": args.left, "right": args.right, "samplesPerSide": args.samples_per_side, "rirs": rirs, "scheduleMode": args.schedule_mode, "remoteRoot": str(remote_root), "leftSshTarget": left_ssh_target, "rightSshTarget": right_ssh_target, "analysisSshTarget": analysis_ssh_target, "leftRemoteRoot": str(left_remote_root), "rightRemoteRoot": str(right_remote_root), "analysisRemoteRoot": str(analysis_remote_root), "sshTarget": args.ssh_target, }) if args.dry_run: print(json.dumps(load_json(run_root / "experiment-config.json"), indent=2, ensure_ascii=False)) return if args.triage_only: run_sequence_triage(run_root / "experiments" / "sequence", args) print(json.dumps({ "runRoot": str(run_root), "triage": str(run_root / "experiments" / "sequence" / "sequence-triage" / "sequence-triage.json"), }, indent=2)) return prepared_remotes: dict[tuple[str, str], bool] = {} prepare_remote_once(prepared_remotes, left_ssh_target, left_remote_root, needs_rpki_client=(left["rpKind"] == "rpki-client")) prepare_remote_once(prepared_remotes, right_ssh_target, right_remote_root, needs_rpki_client=(right["rpKind"] == "rpki-client")) prepare_remote_once(prepared_remotes, analysis_ssh_target, analysis_remote_root, needs_rpki_client=False) local_exp_root = run_root / "experiments" / "sequence" left_seq_path = local_exp_root / "left-sequence.jsonl" right_seq_path = local_exp_root / "right-sequence.jsonl" left_seq_path.unlink(missing_ok=True) right_seq_path.unlink(missing_ok=True) progress: list[dict[str, Any]] = [] if args.schedule_mode == "interleaved": for seq in range(1, args.samples_per_side + 1): for side_label, ssh_target, side_remote_root, side_name, side, seq_path in [ ("A", left_ssh_target, left_remote_root, args.left, left, left_seq_path), ("B", right_ssh_target, right_remote_root, args.right, right, right_seq_path), ]: progress.append( run_one_side_sample( args, ssh_target, side_remote_root, local_exp_root, side_label, side_name, side, seq_path, seq, rirs, ) ) else: with ThreadPoolExecutor(max_workers=2) as executor: futures = [ executor.submit(run_side_sequence, args, left_ssh_target, left_remote_root, local_exp_root, "A", args.left, left, left_seq_path, rirs), executor.submit(run_side_sequence, args, right_ssh_target, right_remote_root, local_exp_root, "B", args.right, right, right_seq_path, rirs), ] for future in as_completed(futures): progress.extend(future.result()) progress.sort(key=lambda item: (str(item.get("side")), int(item.get("seq") or 0))) write_json(local_exp_root / "run-progress.json", progress) if args.remote_triage: sync_side_to_analysis_remote(left_ssh_target, left_remote_root, analysis_ssh_target, analysis_remote_root, "A") sync_side_to_analysis_remote(right_ssh_target, right_remote_root, analysis_ssh_target, analysis_remote_root, "B") remote_exp_root = analysis_remote_root / "experiments" / "sequence" remote_progress = json.dumps(progress, sort_keys=True, ensure_ascii=False) ssh_script( analysis_ssh_target, "set -euo pipefail; " f"cat > {shlex.quote(str(remote_exp_root / 'run-progress.json'))} <<'REMOTE_PROGRESS_JSON'\n" f"{remote_progress}\n" "REMOTE_PROGRESS_JSON\n", ) run_sequence_triage_remote(analysis_ssh_target, analysis_remote_root, args) if args.fetch_remote_analysis: rsync_remote_analysis_from_remote(analysis_ssh_target, remote_exp_root, local_exp_root) else: run_sequence_triage(local_exp_root, args) ssh_script(analysis_ssh_target, f"df -h /data / > {shlex.quote(str(analysis_remote_root / 'df-after.txt'))} 2>&1 || true; free -h > {shlex.quote(str(analysis_remote_root / 'free-after.txt'))} 2>&1 || true") compare_dir = local_exp_root / "sequence-triage" remote_compare_dir = analysis_remote_root / "experiments" / "sequence" / "sequence-triage" print(json.dumps({ "runRoot": str(run_root), "remoteRoot": str(remote_root), "leftRemoteRoot": str(left_remote_root), "rightRemoteRoot": str(right_remote_root), "analysisRemoteRoot": str(analysis_remote_root), "triage": str(compare_dir / "sequence-triage.json") if not args.remote_triage or args.fetch_remote_analysis else None, "remoteTriage": str(remote_compare_dir / "sequence-triage.json") if args.remote_triage else None, }, indent=2)) def main() -> None: parser = argparse.ArgumentParser(description="Feature #043 all5 sequence triage experiment driver") parser.add_argument("--run-root", required=True) parser.add_argument("--remote-root", required=True) parser.add_argument("--ssh-target", default=os.environ.get("SSH_TARGET", "root@47.251.56.108")) parser.add_argument("--left-ssh-target") parser.add_argument("--right-ssh-target") parser.add_argument("--analysis-ssh-target") parser.add_argument("--left-remote-root") parser.add_argument("--right-remote-root") parser.add_argument("--analysis-remote-root") parser.add_argument("--left", default="ours-standard") parser.add_argument("--right", default="rpki-client-standard") parser.add_argument("--samples-per-side", type=int, default=3) parser.add_argument("--rirs", default=",".join(DEFAULT_RIRS)) parser.add_argument("--schedule-mode", choices=["interleaved", "parallel"], default="interleaved") parser.add_argument("--align-window-runs", type=int, default=2) parser.add_argument("--align-window-secs", type=int, default=1800) parser.add_argument("--sample-limit", type=int, default=200) parser.add_argument("--timeline-sample-limit", type=int, default=0) parser.add_argument("--dry-run", action="store_true") parser.add_argument("--skip-build", action="store_true", help="reuse existing release binaries") parser.add_argument("--triage-only", action="store_true", help="only rerun local sequence triage for an existing run root") parser.add_argument("--remote-triage", action="store_true", help="keep CIR/CCR on remote, write sequence JSONL remotely, and run triage on remote") parser.add_argument("--fetch-remote-analysis", action="store_true", help="when --remote-triage is set, fetch only small sequence/triage JSON outputs; never fetch CIR/CCR") parser.add_argument("--cleanup-run-nonessential", action="store_true", help="after each successful remote sequence item, remove report/log/CSV files and keep only CIR/CCR/timing/meta") args = parser.parse_args() if args.samples_per_side < 2: raise SystemExit("--samples-per-side must be >= 2") run_experiment(args) if __name__ == "__main__": main()