rpki/scripts/compare/run_three_rp_10run_benchmark.py

1707 lines
80 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import csv
import gzip
import hashlib
import html
import json
import os
import re
import shlex
import shutil
import subprocess
import sys
import time
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from pathlib import Path
from typing import Any
SCRIPT_DIR = Path(__file__).resolve().parent
REPO_ROOT = SCRIPT_DIR.parents[1]
DEV_ROOT = REPO_ROOT.parents[1]
ROUTINATOR_ROOT = DEV_ROOT / "routinator"
RPKI_CLIENT_ROOT = DEV_ROOT / "rpki-client-portable"
FIXTURE_ROOT = REPO_ROOT / "tests" / "fixtures"
RIR_TAL = {
"afrinic": "afrinic.tal",
"apnic": "apnic-rfc7730-https.tal",
"arin": "arin.tal",
"lacnic": "lacnic.tal",
"ripe": "ripe-ncc.tal",
}
RIR_TA = {
"afrinic": "afrinic-ta.cer",
"apnic": "apnic-ta.cer",
"arin": "arin-ta.cer",
"lacnic": "lacnic-ta.cer",
"ripe": "ripe-ncc-ta.cer",
}
RIR_CIR_TAL_URI = {
"afrinic": "https://rpki.afrinic.net/tal/afrinic.tal",
"apnic": "https://rpki.apnic.net/tal/apnic-rfc7730-https.tal",
"arin": "https://www.arin.net/resources/manage/rpki/arin.tal",
"lacnic": "https://www.lacnic.net/innovaportal/file/4983/1/lacnic.tal",
"ripe": "https://tal.rpki.ripe.net/ripe-ncc.tal",
}
RP_ORDER = ["ours-rp", "routinator", "rpki-client"]
RPKI_SCOPE_POLICIES = {"publication-point", "module-root"}
@dataclass(frozen=True)
class CommandResult:
returncode: int
stdout: str
stderr: str
def run_local(argv: list[str], *, cwd: Path | None = None, check: bool = True, capture: bool = True, env: dict[str, str] | None = None) -> CommandResult:
result = subprocess.run(
argv,
cwd=str(cwd) if cwd else None,
text=True,
capture_output=capture,
env=env,
check=False,
)
if check and result.returncode != 0:
raise SystemExit(
f"command failed ({result.returncode}): {shlex.join(argv)}\n"
f"cwd={cwd}\nstdout:\n{result.stdout}\nstderr:\n{result.stderr}"
)
return CommandResult(result.returncode, result.stdout or "", result.stderr or "")
def ssh_script(target: str, script: str, *, check: bool = True) -> CommandResult:
result = subprocess.run(["ssh", target, "bash", "-s"], input=script, text=True, capture_output=True, check=False)
if check and result.returncode != 0:
raise SystemExit(f"remote script failed ({result.returncode}) on {target}\nstdout:\n{result.stdout}\nstderr:\n{result.stderr}")
return CommandResult(result.returncode, result.stdout or "", result.stderr or "")
def ssh_script_stream(target: str, script: str, *, check: bool = True) -> CommandResult:
result = subprocess.run(["ssh", target, "bash", "-s"], input=script, text=True, check=False)
if check and result.returncode != 0:
raise SystemExit(f"remote script failed ({result.returncode}) on {target}")
return CommandResult(result.returncode, "", "")
def rsync_to_remote(target: str, source: Path, destination: str) -> None:
run_local(["rsync", "-a", str(source), f"{target}:{destination}"])
def rsync_dir_to_remote(target: str, source: Path, destination: str) -> None:
run_local(["rsync", "-a", f"{source}/", f"{target}:{destination}/"])
def rsync_from_remote(target: str, source: str, destination: Path, files: list[str]) -> None:
destination.mkdir(parents=True, exist_ok=True)
files_from = destination.parent / f"{destination.name}-files-from.txt"
files_from.write_text("\n".join(sorted(set(files))) + "\n", encoding="utf-8")
archive_name = f".{destination.name}-remote-files.tar.gz"
archive_remote = f"{source.rstrip('/')}/{archive_name}"
files_from_remote = f"{source.rstrip('/')}/{files_from.name}"
create_archive = f"""
set -euo pipefail
cd {shlex.quote(source)}
tar --ignore-failed-read -czf {shlex.quote(archive_remote)} --files-from {shlex.quote(files_from_remote)}
"""
subprocess.run(["scp", str(files_from), f"{target}:{files_from_remote}"], check=True)
try:
ssh_script(target, create_archive)
run_local(["scp", f"{target}:{archive_remote}", str(destination / archive_name)])
run_local(["tar", "-xzf", str(destination / archive_name), "-C", str(destination)])
finally:
ssh_script(target, f"rm -f {shlex.quote(archive_remote)} {shlex.quote(files_from_remote)}", check=False)
def write_json(path: Path, data: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2, ensure_ascii=False, sort_keys=True) + "\n", encoding="utf-8")
def load_json(path: Path) -> Any:
return json.loads(path.read_text(encoding="utf-8"))
def sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
def git_output(repo: Path, args: list[str], *, check: bool = True) -> str:
return run_local(["git", "-C", str(repo), *args], check=check).stdout.strip()
def git_status(repo: Path) -> str:
return git_output(repo, ["status", "--short"], check=False)
def choose_latest_semver_tag(repo: Path, prefix_v: bool) -> str:
tags = git_output(repo, ["tag", "--list"], check=False).splitlines()
best: tuple[int, ...] | None = None
best_tag = ""
pattern = re.compile(r"^v?(\d+)\.(\d+)(?:\.(\d+))?(?:p(\d+))?$")
for tag in tags:
if prefix_v and not tag.startswith("v"):
continue
if not prefix_v and tag.startswith("v"):
continue
match = pattern.match(tag)
if not match:
continue
parts = tuple(int(part) if part is not None else 0 for part in match.groups())
if best is None or parts > best:
best = parts
best_tag = tag
if not best_tag:
raise SystemExit(f"cannot resolve latest {'v-prefixed' if prefix_v else 'numeric'} semver tag in {repo}")
return best_tag
def parse_rirs(raw: str) -> list[str]:
rirs = []
for token in raw.split(","):
rir = token.strip().lower()
if not rir:
continue
if rir not in RIR_TAL:
raise SystemExit(f"invalid RIR: {rir}; allowed: {','.join(RIR_TAL)}")
rirs.append(rir)
if not rirs:
raise SystemExit("at least one RIR is required")
return rirs
def parse_elapsed_to_ms(raw: str) -> int:
raw = raw.strip()
if not raw:
return 0
days = 0
if "-" in raw:
day_raw, raw = raw.split("-", 1)
days = int(day_raw)
parts = raw.split(":")
try:
if len(parts) == 3:
hours, minutes, seconds = parts
elif len(parts) == 2:
hours = "0"
minutes, seconds = parts
else:
hours = "0"
minutes = "0"
seconds = parts[0]
total = days * 86400 + int(hours) * 3600 + int(minutes) * 60 + float(seconds)
return int(round(total * 1000))
except ValueError:
return 0
def parse_time_file(path: Path) -> dict[str, Any]:
values: dict[str, Any] = {}
if not path.is_file():
return values
for line in path.read_text(encoding="utf-8", errors="replace").splitlines():
if "Elapsed (wall clock) time" in line:
elapsed = line.split(":", 1)[1]
if ")" in line:
elapsed = line.rsplit("):", 1)[1]
values["wallMs"] = parse_elapsed_to_ms(elapsed)
elif "Maximum resident set size" in line:
values["maxRssKb"] = _last_int(line)
elif "User time (seconds)" in line:
values["userSeconds"] = _last_float(line)
elif "System time (seconds)" in line:
values["systemSeconds"] = _last_float(line)
elif "Percent of CPU" in line:
values["cpuPercent"] = line.rsplit(":", 1)[1].strip()
return values
def _last_int(line: str) -> int:
try:
return int(line.rsplit(":", 1)[1].strip())
except Exception:
return 0
def _last_float(line: str) -> float:
try:
return float(line.rsplit(":", 1)[1].strip())
except Exception:
return 0.0
def normalize_asn(value: Any) -> str:
text = str(value).strip().strip('"')
if text.upper().startswith("AS"):
text = text[2:]
return f"AS{int(text)}"
def normalize_prefix(prefix: Any) -> str:
text = str(prefix).strip().strip('"')
return text
def normalize_vrps_from_csv(path: Path) -> set[str]:
rows: set[str] = set()
if not path.is_file():
return rows
with path.open("r", encoding="utf-8", errors="replace", newline="") as handle:
reader = csv.DictReader(handle)
for row in reader:
asn = row.get("ASN") or row.get("asn") or row.get("AS")
prefix = row.get("IP Prefix") or row.get("prefix") or row.get("Prefix")
max_len = row.get("Max Length") or row.get("maxLength") or row.get("maxlength")
if asn and prefix and max_len:
rows.add(f"{normalize_asn(asn)},{normalize_prefix(prefix)},{int(str(max_len).strip())}")
return rows
def normalize_vaps_from_csv(path: Path) -> set[str]:
rows: set[str] = set()
if not path.is_file():
return rows
with path.open("r", encoding="utf-8", errors="replace", newline="") as handle:
reader = csv.DictReader(handle)
for row in reader:
customer = row.get("Customer ASN") or row.get("customer") or row.get("customer_asid")
providers_raw = row.get("Providers") or row.get("providers") or ""
if not customer:
continue
providers = [normalize_asn(item) for item in re.split(r"[;,\s]+", providers_raw.strip()) if item]
rows.add(f"{normalize_asn(customer)}|{','.join(sorted(set(providers), key=lambda x: int(x[2:])))}")
return rows
def normalize_vrps_from_json(path: Path) -> set[str]:
rows: set[str] = set()
if not path.is_file():
return rows
data = load_json(path)
for key in ("roas", "routeOrigins", "valid_roas"):
items = data.get(key) if isinstance(data, dict) else None
if not isinstance(items, list):
continue
for item in items:
if not isinstance(item, dict):
continue
asn = item.get("asn") or item.get("origin") or item.get("origin_as")
prefix = item.get("prefix")
max_len = item.get("maxLength") or item.get("max_length") or item.get("maxlen")
if asn is not None and prefix is not None and max_len is not None:
rows.add(f"{normalize_asn(asn)},{normalize_prefix(prefix)},{int(max_len)}")
return rows
def normalize_vaps_from_json(path: Path) -> set[str]:
rows: set[str] = set()
if not path.is_file():
return rows
data = load_json(path)
for key in ("aspas", "aspaAssertions", "vaps"):
items = data.get(key) if isinstance(data, dict) else None
if not isinstance(items, list):
continue
for item in items:
if not isinstance(item, dict):
continue
customer = item.get("customer") or item.get("customer_asid") or item.get("customerAsid") or item.get("customerAsn")
providers = item.get("providers") or item.get("providerAsns") or item.get("provider_asns") or []
if customer is None:
continue
if isinstance(providers, str):
provider_items = [p for p in re.split(r"[;,\s]+", providers) if p]
else:
provider_items = providers
provider_asns = [normalize_asn(provider) for provider in provider_items]
rows.add(f"{normalize_asn(customer)}|{','.join(sorted(set(provider_asns), key=lambda x: int(x[2:])))}")
return rows
def write_set(path: Path, values: set[str]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("\n".join(sorted(values)) + ("\n" if values else ""), encoding="utf-8")
def write_gzip_set(path: Path, values: set[str]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with gzip.open(path, "wt", encoding="utf-8") as handle:
handle.write("\n".join(sorted(values)) + ("\n" if values else ""))
def read_set(path: Path) -> set[str]:
gzip_candidate = Path(f"{path}.gz")
candidate = gzip_candidate if gzip_candidate.is_file() else path
if not candidate.is_file():
return set()
if candidate.suffix == ".gz":
with gzip.open(candidate, "rt", encoding="utf-8", errors="replace") as handle:
return {line.strip() for line in handle if line.strip()}
return {line.strip() for line in candidate.read_text(encoding="utf-8", errors="replace").splitlines() if line.strip()}
def set_delta(previous: set[str], current: set[str]) -> dict[str, Any]:
added = current - previous
removed = previous - current
unchanged = current & previous
denom = max(len(previous), len(current), 1)
return {
"added": len(added),
"removed": len(removed),
"unchanged": len(unchanged),
"previous": len(previous),
"current": len(current),
"changedPercent": round((len(added) + len(removed)) / denom * 100, 6),
"unchangedPercent": round(len(unchanged) / denom * 100, 6),
}
def overlap(left: set[str], right: set[str]) -> dict[str, Any]:
intersection = left & right
union = left | right
return {
"intersection": len(intersection),
"onlyLeft": len(left - right),
"onlyRight": len(right - left),
"leftCount": len(left),
"rightCount": len(right),
"jaccard": round(len(intersection) / max(len(union), 1), 8),
}
def artifact(path: Path, run_root: Path, type_name: str) -> dict[str, Any]:
if not path.exists():
return {"type": type_name, "path": str(path), "exists": False, "size": 0, "sha256": ""}
return {
"type": type_name,
"path": str(path),
"relativePath": path.relative_to(run_root).as_posix() if path.is_relative_to(run_root) else str(path),
"exists": True,
"size": path.stat().st_size,
"sha256": sha256_file(path) if path.is_file() else "",
}
def resolve_versions() -> dict[str, Any]:
routinator_tag = choose_latest_semver_tag(ROUTINATOR_ROOT, prefix_v=True)
rpki_client_tag = choose_latest_semver_tag(RPKI_CLIENT_ROOT, prefix_v=False)
return {
"ours-rp": {
"sourceDir": str(REPO_ROOT),
"commit": git_output(REPO_ROOT, ["rev-parse", "HEAD"], check=False),
"status": git_status(REPO_ROOT),
"tag": "",
},
"routinator": {
"sourceDir": str(ROUTINATOR_ROOT),
"selectedTag": routinator_tag,
"currentRef": git_output(ROUTINATOR_ROOT, ["rev-parse", "--abbrev-ref", "HEAD"], check=False),
"currentCommit": git_output(ROUTINATOR_ROOT, ["rev-parse", "HEAD"], check=False),
"status": git_status(ROUTINATOR_ROOT),
},
"rpki-client": {
"sourceDir": str(RPKI_CLIENT_ROOT),
"selectedTag": rpki_client_tag,
"currentRef": git_output(RPKI_CLIENT_ROOT, ["rev-parse", "--abbrev-ref", "HEAD"], check=False),
"currentCommit": git_output(RPKI_CLIENT_ROOT, ["rev-parse", "HEAD"], check=False),
"status": git_status(RPKI_CLIENT_ROOT),
},
}
def prepare_rpki_client_official_tree(tag: str) -> None:
"""Checkout official portable tag and rebuild generated 9.8 sources."""
if git_status(RPKI_CLIENT_ROOT):
raise SystemExit(f"rpki-client worktree dirty; refusing checkout:\n{git_status(RPKI_CLIENT_ROOT)}")
run_local(["git", "checkout", tag], cwd=RPKI_CLIENT_ROOT)
update_branch = f"rpki-client-{tag}"
local_ref = run_local(["git", "rev-parse", "--verify", update_branch], cwd=RPKI_CLIENT_ROOT / "openbsd", check=False)
if local_ref.returncode == 0:
restore_rpki_client_sources_from_local_openbsd(update_branch)
else:
update_result = run_local(["./update.sh", update_branch], cwd=RPKI_CLIENT_ROOT, check=False)
if update_result.returncode != 0:
print(
f"warning: ./update.sh {update_branch} failed; falling back to local openbsd checkout",
file=sys.stderr,
)
print(update_result.stdout, file=sys.stderr)
print(update_result.stderr, file=sys.stderr)
restore_rpki_client_sources_from_local_openbsd(update_branch)
configure_env = rpki_client_configure_env()
prefix = DEV_ROOT / ".cache" / "rpki-client-9.8-cir"
run_local(
[
"./configure",
f"--prefix={prefix}",
"--with-rsync=rsync",
"--with-user=_rpki-client",
],
cwd=RPKI_CLIENT_ROOT,
env=configure_env,
)
run_local(["make", "clean"], cwd=RPKI_CLIENT_ROOT, check=False, capture=False)
run_local(["make", f"-j{max(2, min(os.cpu_count() or 2, 8))}"], cwd=RPKI_CLIENT_ROOT, env=configure_env, capture=False)
binary = RPKI_CLIENT_ROOT / "src" / "rpki-client"
version_result = run_local([str(binary), "-V"])
version_lines = [line.strip() for text in (version_result.stdout, version_result.stderr) for line in text.splitlines() if line.strip()]
version = version_lines[-1] if version_lines else ""
if version != f"rpki-client-portable {tag}":
raise SystemExit(f"unexpected rpki-client version after build: {version}")
source_text = (RPKI_CLIENT_ROOT / "src" / "http.c").read_text(encoding="utf-8", errors="replace")
if 'unveil("/", "r")' not in source_text:
raise SystemExit("official rpki-client 9.8 source missing landlock unveil(\"/\", \"r\") patch")
binary_strings = run_local(["strings", str(binary)]).stdout
if "--ta-fixture" in binary_strings:
raise SystemExit("official rpki-client binary still contains local ta-fixture patch")
if "_rpki-client" not in binary_strings:
raise SystemExit("rpki-client binary was not configured with --with-user=_rpki-client")
def restore_rpki_client_sources_from_local_openbsd(openbsd_ref: str) -> None:
openbsd_repo = RPKI_CLIENT_ROOT / "openbsd"
if not (openbsd_repo / ".git").is_dir():
raise SystemExit("rpki-client local openbsd checkout missing; cannot restore official source")
checkout_result = run_local(["git", "checkout", openbsd_ref], cwd=openbsd_repo, check=False)
if checkout_result.returncode != 0:
raise SystemExit(
f"cannot checkout local openbsd ref {openbsd_ref}\n"
f"stdout:\n{checkout_result.stdout}\nstderr:\n{checkout_result.stderr}"
)
generated_main = RPKI_CLIENT_ROOT / "src" / "main.c"
generated_output = RPKI_CLIENT_ROOT / "src" / "output.c"
version_h = RPKI_CLIENT_ROOT / "src" / "version.h"
if not generated_main.is_file() or not generated_output.is_file() or not version_h.is_file():
raise SystemExit("rpki-client generated source tree missing; automake/autoconf unavailable to regenerate it")
openbsd_source = RPKI_CLIENT_ROOT / "openbsd" / "src" / "usr.sbin" / "rpki-client"
if not (openbsd_source / "main.c").is_file():
raise SystemExit("rpki-client official OpenBSD source mirror missing; cannot restore official source")
source_names = [
"as.c", "aspa.c", "ccr.c", "cert.c", "cms.c", "constraints.c", "crl.c", "encoding.c",
"extern.h", "filemode.c", "http.c", "io.c", "ip.c", "json.c", "json.h",
"main.c", "mft.c", "mkdir.c", "ometric.c", "ometric.h", "output-bgpd.c", "output-bird.c",
"output-csv.c", "output-json.c", "output-ometric.c", "output.c", "parser.c",
"print.c", "repo.c", "rfc3779.c", "roa.c", "rpki-asn1.h", "rrdp.c",
"rrdp.h", "rrdp_delta.c", "rrdp_notification.c", "rrdp_snapshot.c", "rrdp_util.c",
"rsc.c", "rsync.c", "spl.c", "tak.c", "tal.c", "validate.c", "version.h", "x509.c",
]
for name in source_names:
shutil.copy2(openbsd_source / name, RPKI_CLIENT_ROOT / "src" / name)
os.utime(RPKI_CLIENT_ROOT / "src" / name, None)
man_source = openbsd_source / "rpki-client.8"
if man_source.is_file():
shutil.copy2(man_source, RPKI_CLIENT_ROOT / "src" / "rpki-client.8")
os.utime(RPKI_CLIENT_ROOT / "src" / "rpki-client.8", None)
version_text = (openbsd_source / "version.h").read_text(encoding="utf-8", errors="replace")
match = re.search(r'RPKI_VERSION\s+"([^"]+)"', version_text)
if match:
(RPKI_CLIENT_ROOT / "VERSION").write_text(match.group(1) + "\n", encoding="utf-8")
for patch in sorted((RPKI_CLIENT_ROOT / "patches").glob("*.patch")):
run_local(["patch", "-s", "-p3", "-i", str(patch)], cwd=RPKI_CLIENT_ROOT / "src")
man_page = RPKI_CLIENT_ROOT / "src" / "rpki-client.8"
if man_page.is_file():
man_page.replace(RPKI_CLIENT_ROOT / "src" / "rpki-client.8.in")
generated_text = "\n".join(
(RPKI_CLIENT_ROOT / "src" / name).read_text(encoding="utf-8", errors="replace")
for name in ("main.c", "output.c", "extern.h")
)
if "FORMAT_CIR" in generated_text or "ta_fixture" in generated_text:
raise SystemExit("failed to restore official rpki-client generated source")
src_dir = RPKI_CLIENT_ROOT / "src"
for path in [src_dir / "rpki-client", *src_dir.glob("rpki_client-*.o")]:
if path.exists():
path.unlink()
deps_dir = src_dir / ".deps"
if deps_dir.is_dir():
for path in deps_dir.glob("rpki_client-*.Po"):
path.unlink()
for rel in ("src/Makefile", "src/Makefile.in"):
makefile = RPKI_CLIENT_ROOT / rel
if makefile.exists():
sanitize_rpki_client_makefile(makefile)
def rpki_client_configure_env() -> dict[str, str]:
env = os.environ.copy()
include_flags = ["-D_DEFAULT_SOURCE", "-D_BSD_SOURCE", "-D_GNU_SOURCE"]
cpp_flags = ["-DOPENSSL_SUPPRESS_DEPRECATED"]
ld_flags: list[str] = []
sysroot = DEV_ROOT / ".cache" / "rpki-client-9.7-build" / "sysroot" / "usr"
libtls_root = DEV_ROOT / ".cache" / "libtls-dev-3.8.1"
if (sysroot / "include").is_dir():
include_flags.insert(0, f"-I{sysroot / 'include'}")
if (sysroot / "lib" / "x86_64-linux-gnu").is_dir():
ld_flags.append(f"-L{sysroot / 'lib' / 'x86_64-linux-gnu'}")
if (libtls_root / "include").is_dir():
include_flags.append(f"-I{libtls_root / 'include'}")
cpp_flags.insert(0, f"-I{libtls_root / 'include'}")
if (libtls_root / "lib").is_dir():
ld_flags.append(f"-L{libtls_root / 'lib'}")
prefix = DEV_ROOT / ".cache" / "rpki-client-9.8-cir"
ld_flags.append(f"-Wl,-rpath,{prefix}")
env["CFLAGS"] = " ".join(["-g", "-O2", *include_flags])
env["CPPFLAGS"] = " ".join(cpp_flags)
env["LDFLAGS"] = " ".join(ld_flags)
return env
def sanitize_rpki_client_makefile(path: Path) -> None:
"""Remove local CIR source residue from generated automake files."""
text = path.read_text(encoding="utf-8", errors="replace")
lines = text.splitlines()
sanitized: list[str] = []
skip_rule = False
for line in lines:
if line.startswith("rpki_client-cir.o:") or line.startswith("rpki_client-cir.obj:"):
skip_rule = True
continue
if skip_rule:
if line.startswith("rpki_client-") and not line.startswith(("rpki_client-cir.o:", "rpki_client-cir.obj:")):
skip_rule = False
else:
continue
current = line.replace(" rpki_client-cir.$(OBJEXT)", "")
current = current.replace(" rpki_client-cir.Po", "")
current = current.replace(" ./$(DEPDIR)/rpki_client-cir.Po", "")
current = current.replace(" cert.c cir.c cms.c", " cert.c cms.c")
current = current.replace("rpki_client-cir.Po ", "")
current = current.replace("@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/rpki_client-cir.Po@am__quote@ # am--include-marker", "")
current = current.replace("include ./$(DEPDIR)/rpki_client-cir.Po # am--include-marker", "")
current = current.replace("\t-rm -f ./$(DEPDIR)/rpki_client-cir.Po", "")
if current:
sanitized.append(current)
path.write_text("\n".join(sanitized) + "\n", encoding="utf-8")
def build_release_binaries(versions: dict[str, Any], skip_build: bool, active_rp_names: list[str]) -> dict[str, Any]:
if not skip_build:
run_local(["cargo", "build", "--release", "--bin", "rpki"], cwd=REPO_ROOT)
if "routinator" in active_rp_names and versions["routinator"].get("status"):
raise SystemExit(f"routinator worktree dirty; refusing checkout:\n{versions['routinator']['status']}")
if "routinator" in active_rp_names:
run_local(["git", "checkout", versions["routinator"]["selectedTag"]], cwd=ROUTINATOR_ROOT)
run_local(["cargo", "build", "--release"], cwd=ROUTINATOR_ROOT)
if "rpki-client" in active_rp_names:
prepare_rpki_client_official_tree(versions["rpki-client"]["selectedTag"])
binaries = {
"ours-rp": REPO_ROOT / "target" / "release" / "rpki",
"routinator": ROUTINATOR_ROOT / "target" / "release" / "routinator",
"rpki-client": RPKI_CLIENT_ROOT / "src" / "rpki-client",
}
metadata: dict[str, Any] = {}
for name, path in binaries.items():
if name not in active_rp_names:
continue
if not path.is_file():
raise SystemExit(f"missing binary for {name}: {path}")
metadata[name] = {
**versions[name],
"binaryPath": str(path),
"binarySha256": sha256_file(path),
"binarySize": path.stat().st_size,
"builtAtUtc": utc_iso(),
}
if name != "ours-rp":
source = Path(metadata[name]["sourceDir"])
metadata[name]["commit"] = git_output(source, ["rev-parse", "HEAD"], check=False)
metadata[name]["statusAfterBuild"] = git_status(source)
return metadata
def utc_iso() -> str:
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
def build_fixture_tree(local_root: Path, rirs: list[str]) -> None:
for sub in ("tal", "ta"):
(local_root / sub).mkdir(parents=True, exist_ok=True)
for rir in rirs:
shutil.copy2(FIXTURE_ROOT / "tal" / RIR_TAL[rir], local_root / "tal" / RIR_TAL[rir])
shutil.copy2(FIXTURE_ROOT / "ta" / RIR_TA[rir], local_root / "ta" / RIR_TA[rir])
def active_rps(args: argparse.Namespace) -> list[str]:
rps = [item.strip() for item in args.rps.split(",") if item.strip()]
invalid = [rp for rp in rps if rp not in RP_ORDER]
if invalid:
raise SystemExit(f"invalid --rps entries: {','.join(invalid)}; allowed: {','.join(RP_ORDER)}")
if not rps:
raise SystemExit("--rps must include at least one RP")
return rps
def prepare_remote(args: argparse.Namespace, binary_meta: dict[str, Any], rirs: list[str], local_fixture_root: Path) -> None:
remote = args.remote_root
local_remote_root = args.run_root / "remote"
if local_remote_root.exists():
shutil.rmtree(local_remote_root)
(local_remote_root / "bin").mkdir(parents=True, exist_ok=True)
(local_remote_root / "fixtures").mkdir(parents=True, exist_ok=True)
(local_remote_root / "env").mkdir(parents=True, exist_ok=True)
shutil.copytree(local_fixture_root, local_remote_root / "fixtures", dirs_exist_ok=True)
write_json(local_remote_root / "sources-version-metadata.json", binary_meta)
for rp, remote_name in (("ours-rp", "rpki"), ("routinator", "routinator"), ("rpki-client", "rpki-client")):
if rp not in args.active_rps:
continue
shutil.copy2(Path(binary_meta[rp]["binaryPath"]), local_remote_root / "bin" / remote_name)
ssh_script(args.ssh_target, f"""
set -euo pipefail
systemctl disable --now rpki-client.timer >/dev/null 2>&1 || true
systemctl stop rpki-client.service >/dev/null 2>&1 || true
pkill -x rpki-client >/dev/null 2>&1 || true
pkill -x routinator >/dev/null 2>&1 || true
pkill -x rpki >/dev/null 2>&1 || true
id -u _rpki-client >/dev/null 2>&1 || useradd -r -M -s /usr/sbin/nologin _rpki-client || true
rm -rf {shlex.quote(remote)}
mkdir -p {shlex.quote(remote)}/bin {shlex.quote(remote)}/fixtures {shlex.quote(remote)}/env {shlex.quote(remote)}/reports {shlex.quote(remote)}/rp-runs
(df -h; echo; free -h; echo; uptime; echo; uname -a; echo; nproc; echo; lscpu | head -n 40) > {shlex.quote(remote)}/env/before.txt 2>&1 || true
""")
write_json(args.run_root / "version-metadata.json", binary_meta)
rsync_dir_to_remote(args.ssh_target, local_remote_root, remote)
def fetch_only_binary_metadata(args: argparse.Namespace) -> dict[str, Any]:
local_remote_root = args.run_root / "remote"
if local_remote_root.exists():
shutil.rmtree(local_remote_root)
local_remote_root.mkdir(parents=True, exist_ok=True)
rsync_from_remote(args.ssh_target, args.remote_root, local_remote_root, ["sources-version-metadata.json"])
metadata = local_remote_root / "sources-version-metadata.json"
if not metadata.is_file():
raise SystemExit(f"missing remote sources-version-metadata.json under {args.remote_root}")
return load_json(metadata)
def remote_shell_for_rp(rp: str, run_index: int, mode: str, rirs: list[str], remote_root: str, args: argparse.Namespace) -> str:
run_dir = f"{remote_root}/rp-runs/{rp}/runs/run_{run_index:04d}"
state_dir = f"{remote_root}/rp-runs/{rp}/state"
fixture_dir = f"{remote_root}/fixtures"
if rp == "ours-rp":
argv = [
f"{remote_root}/bin/rpki",
"--db", f"{state_dir}/work-db",
"--repo-bytes-db", f"{state_dir}/repo-bytes.db",
"--rsync-mirror-root", f"{state_dir}/rsync-mirror",
"--rsync-scope", args.ours_rsync_scope,
"--report-json", f"{run_dir}/report.json",
"--report-json-compact",
"--ccr-out", f"{run_dir}/result.ccr",
"--cir-enable", "--cir-out", f"{run_dir}/input.cir",
"--vrps-csv-out", f"{run_dir}/vrps.csv",
"--vaps-csv-out", f"{run_dir}/vaps.csv",
"--compare-view-trust-anchor", "all5" if len(rirs) > 1 else rirs[0],
]
for rir in rirs:
argv.extend(["--tal-path", f"{fixture_dir}/tal/{RIR_TAL[rir]}", "--ta-path", f"{fixture_dir}/ta/{RIR_TA[rir]}"])
for rir in rirs:
argv.extend(["--cir-tal-uri", RIR_CIR_TAL_URI[rir]])
reset = f"rm -rf {shlex.quote(state_dir)} && mkdir -p {shlex.quote(state_dir)}/work-db {shlex.quote(state_dir)}/rsync-mirror" if mode == "snapshot" else f"mkdir -p {shlex.quote(state_dir)}/work-db {shlex.quote(state_dir)}/rsync-mirror"
return timed_script(run_dir, reset, argv)
if rp == "routinator":
extra_tals = f"{state_dir}/extra-tals"
argv = [
f"{remote_root}/bin/routinator",
"-r", f"{state_dir}/repository",
"--no-rir-tals",
"--extra-tals-dir", extra_tals,
]
if args.routinator_enable_aspa:
argv.append("--enable-aspa")
if mode == "snapshot":
argv.append("--fresh")
argv.extend(["vrps", "-f", "jsonext", "-o", f"{run_dir}/routinator.json"])
tal_copy = " && ".join(
f"cp {shlex.quote(fixture_dir + '/tal/' + RIR_TAL[rir])} {shlex.quote(extra_tals + '/' + RIR_TAL[rir])}" for rir in rirs
)
reset = (
f"rm -rf {shlex.quote(state_dir)} && mkdir -p {shlex.quote(state_dir)}/repository {shlex.quote(extra_tals)} && {tal_copy}"
if mode == "snapshot"
else f"mkdir -p {shlex.quote(state_dir)}/repository {shlex.quote(extra_tals)} && {tal_copy}"
)
return timed_script(run_dir, reset, argv)
if rp == "rpki-client":
skiplist = f"{state_dir}/skiplist"
argv = [f"{remote_root}/bin/rpki-client"]
if args.rpki_client_parser_workers:
argv.extend(["-p", str(args.rpki_client_parser_workers)])
argv.extend(["-j", "-c", "-S", skiplist, "-d", f"{state_dir}/cache"])
for rir in rirs:
argv.extend(["-t", f"{fixture_dir}/tal/{RIR_TAL[rir]}"])
argv.append(run_dir)
ta_prime = " && ".join(
[
(
f"mkdir -p {shlex.quote(state_dir + '/cache/.ta/' + RIR_TAL[rir].removesuffix('.tal'))} "
f"&& cp {shlex.quote(fixture_dir + '/ta/' + RIR_TA[rir])} "
f"{shlex.quote(state_dir + '/cache/.ta/' + RIR_TAL[rir].removesuffix('.tal') + '/' + tal_certificate_filename(rir))}"
)
for rir in rirs
]
)
reset = (
f"rm -rf {shlex.quote(state_dir)} && mkdir -p {shlex.quote(state_dir)}/cache "
f"&& {ta_prime} "
f"&& touch {shlex.quote(skiplist)} && chown -R _rpki-client:_rpki-client {shlex.quote(state_dir)} {shlex.quote(run_dir)} "
f"&& chmod -R u+rwX,go+rX {shlex.quote(state_dir)} {shlex.quote(run_dir)}"
if mode == "snapshot"
else f"mkdir -p {shlex.quote(state_dir)}/cache && touch {shlex.quote(skiplist)} "
f"&& chown _rpki-client:_rpki-client {shlex.quote(state_dir)} {shlex.quote(state_dir + '/cache')} {shlex.quote(skiplist)} {shlex.quote(run_dir)} "
f"&& chmod u+rwX,go+rX {shlex.quote(state_dir)} {shlex.quote(state_dir + '/cache')} {shlex.quote(skiplist)} {shlex.quote(run_dir)}"
)
return timed_script(run_dir, reset, argv)
raise ValueError(rp)
def tal_certificate_filename(rir: str) -> str:
tal_path = FIXTURE_ROOT / "tal" / RIR_TAL[rir]
for line in tal_path.read_text(encoding="utf-8", errors="replace").splitlines():
stripped = line.strip()
if stripped.startswith(("rsync://", "https://")) and stripped.endswith(".cer"):
return stripped.rsplit("/", 1)[1]
raise SystemExit(f"cannot resolve TA certificate filename from {tal_path}")
def timed_script(run_dir: str, prefix: str, argv: list[str]) -> str:
argv_q = shlex.join(argv)
return f"""
mkdir -p {shlex.quote(run_dir)}
{prefix}
date -u +%Y-%m-%dT%H:%M:%SZ > {shlex.quote(run_dir)}/start-utc.txt
set +e
set +m
/usr/bin/time -v -o {shlex.quote(run_dir)}/process-time.txt -- {argv_q} > {shlex.quote(run_dir)}/stdout.log 2> {shlex.quote(run_dir)}/stderr.log
code=$?
set -e
find {shlex.quote(run_dir)} -maxdepth 1 -type f -name '.*' -delete 2>/dev/null || true
date -u +%Y-%m-%dT%H:%M:%SZ > {shlex.quote(run_dir)}/end-utc.txt
printf '%s\n' "$code" > {shlex.quote(run_dir)}/exit-code.txt
exit "$code"
"""
def run_remote_matrix(args: argparse.Namespace, rirs: list[str]) -> None:
if args.schedule == "rp-block":
steps = ((rp, run_index) for rp in args.active_rps for run_index in range(1, args.runs + 1))
else:
steps = ((rp, run_index) for run_index in range(1, args.runs + 1) for rp in args.active_rps)
for rp, run_index in steps:
mode = "snapshot" if run_index == 1 else "delta"
print(f"remote_run_start rp={rp} run={run_index} mode={mode} at={utc_iso()}", flush=True)
script = "set -euo pipefail\n" + remote_shell_for_rp(rp, run_index, mode, rirs, args.remote_root, args)
result = ssh_script(args.ssh_target, script, check=False)
if result.returncode != 0:
print(f"remote run failed: {rp} run {run_index} exit={result.returncode}", file=sys.stderr)
print(result.stdout, file=sys.stderr)
print(result.stderr, file=sys.stderr)
if not args.continue_on_failure:
return
print(f"remote_run_done rp={rp} run={run_index} exit={result.returncode} at={utc_iso()}", flush=True)
ssh_script(args.ssh_target, f"(df -h; echo; free -h; echo; uptime) > {shlex.quote(args.remote_root)}/env/after.txt 2>&1 || true", check=False)
def fetch_remote_outputs(args: argparse.Namespace) -> None:
collect_remote_metadata(args)
destination = args.run_root / "remote"
if destination.exists():
shutil.rmtree(destination)
rsync_from_remote(args.ssh_target, args.remote_root, destination, expected_remote_files(args.runs, args.active_rps))
def collect_remote_metadata(args: argparse.Namespace) -> None:
remote_script = f"""
set -euo pipefail
ROOT={shlex.quote(args.remote_root)}
for rp in {' '.join(args.active_rps)}; do
for run_dir in "$ROOT"/rp-runs/$rp/runs/run_*; do
[ -d "$run_dir" ] || continue
case "$rp" in
ours-rp)
if [ -f "$run_dir/vrps.csv" ] || [ -f "$run_dir/vaps.csv" ]; then
python3 - "$run_dir/vrps.csv" "$run_dir/vaps.csv" "$run_dir/normalized-products.json" "$run_dir/normalized-vrps.txt.gz" "$run_dir/normalized-vaps.txt.gz" "$run_dir/ours-rp-counts.json" <<'PY'
import csv, gzip, json, re, sys
vrps_csv, vaps_csv, meta, vrps_out, vaps_out, counts_out = sys.argv[1:]
def asn(v):
s = str(v).strip().strip('"')
if s.upper().startswith('AS'):
s = s[2:]
return 'AS' + str(int(s))
vrps = set()
try:
with open(vrps_csv, encoding='utf-8', errors='replace', newline='') as f:
for row in csv.DictReader(f):
a = row.get('ASN') or row.get('asn') or row.get('AS')
p = row.get('IP Prefix') or row.get('prefix') or row.get('Prefix')
m = row.get('Max Length') or row.get('maxLength') or row.get('maxlength')
if a and p and m:
vrps.add(f"{{asn(a)}},{{str(p).strip().strip(chr(34))}},{{int(str(m).strip())}}")
except FileNotFoundError:
pass
vaps = set()
try:
with open(vaps_csv, encoding='utf-8', errors='replace', newline='') as f:
for row in csv.DictReader(f):
c = row.get('Customer ASN') or row.get('customer') or row.get('customer_asid')
ps = row.get('Providers') or row.get('providers') or ''
if c:
providers = sorted({{asn(p) for p in re.split(r'[;,\\s]+', ps.strip()) if p}}, key=lambda x: int(x[2:]))
vaps.add(f"{{asn(c)}}|{{','.join(providers)}}")
except FileNotFoundError:
pass
with gzip.open(vrps_out, 'wt', encoding='utf-8') as f:
for row in sorted(vrps):
f.write(row + '\\n')
with gzip.open(vaps_out, 'wt', encoding='utf-8') as f:
for row in sorted(vaps):
f.write(row + '\\n')
json.dump({{'counts': {{'vrps': len(vrps), 'vaps': len(vaps)}}}}, open(meta, 'w', encoding='utf-8'), indent=2, sort_keys=True)
json.dump({{'counts': {{'vrps': len(vrps), 'vaps': len(vaps)}}}}, open(counts_out, 'w', encoding='utf-8'), indent=2, sort_keys=True)
PY
fi
;;
routinator)
json="$run_dir/routinator.json"
if [ -f "$json" ]; then
python3 - "$json" "$run_dir/normalized-products.json" "$run_dir/normalized-vrps.txt.gz" "$run_dir/normalized-vaps.txt.gz" "$run_dir/routinator-counts.json" <<'PY'
import gzip, json, re, sys
src, meta, vrps_out, vaps_out, counts_out = sys.argv[1:]
data = json.load(open(src, encoding='utf-8', errors='replace'))
def asn(v):
s = str(v).strip().strip('"')
if s.upper().startswith('AS'):
s = s[2:]
return 'AS' + str(int(s))
def vrps_from_json(d):
rows = set()
for key in ('roas', 'routeOrigins', 'valid_roas'):
for item in d.get(key, []) if isinstance(d, dict) else []:
if not isinstance(item, dict):
continue
a = item.get('asn') or item.get('origin') or item.get('origin_as')
p = item.get('prefix')
m = item.get('maxLength') or item.get('max_length') or item.get('maxlen')
if a is not None and p is not None and m is not None:
rows.add(f"{{asn(a)}},{{str(p).strip().strip(chr(34))}},{{int(m)}}")
return rows
def vaps_from_json(d):
rows = set()
for key in ('aspas', 'aspaAssertions', 'vaps'):
for item in d.get(key, []) if isinstance(d, dict) else []:
if not isinstance(item, dict):
continue
c = item.get('customer') or item.get('customer_asid') or item.get('customerAsid') or item.get('customerAsn')
ps = item.get('providers') or item.get('providerAsns') or item.get('provider_asns') or []
if c is None:
continue
if isinstance(ps, str):
pitems = [p for p in re.split(r'[;,\\s]+', ps.strip()) if p]
else:
pitems = ps
providers = sorted({{asn(p) for p in pitems}}, key=lambda x: int(x[2:]))
rows.add(f"{{asn(c)}}|{{','.join(providers)}}")
return rows
vrps = vrps_from_json(data)
vaps = vaps_from_json(data)
with gzip.open(vrps_out, 'wt', encoding='utf-8') as f:
for row in sorted(vrps):
f.write(row + '\\n')
with gzip.open(vaps_out, 'wt', encoding='utf-8') as f:
for row in sorted(vaps):
f.write(row + '\\n')
json.dump({{'counts': {{'vrps': len(vrps), 'vaps': len(vaps)}}}}, open(meta, 'w', encoding='utf-8'), indent=2, sort_keys=True)
json.dump({{'counts': {{'vrps': len(vrps), 'vaps': len(vaps)}}}}, open(counts_out, 'w', encoding='utf-8'), indent=2, sort_keys=True)
PY
fi
;;
rpki-client)
if [ -f "$run_dir/csv" ] || [ -f "$run_dir/json" ]; then
python3 - "$run_dir/csv" "$run_dir/json" "$run_dir/normalized-products.json" "$run_dir/normalized-vrps.txt.gz" "$run_dir/normalized-vaps.txt.gz" "$run_dir/rpki-client-counts.json" <<'PY'
import csv, gzip, json, re, sys
csv_path, json_path, meta, vrps_out, vaps_out, counts_out = sys.argv[1:]
def asn(v):
s = str(v).strip().strip('"')
if s.upper().startswith('AS'):
s = s[2:]
return 'AS' + str(int(s))
vrps = set()
try:
with open(csv_path, encoding='utf-8', errors='replace', newline='') as f:
for row in csv.DictReader(f):
a = row.get('ASN') or row.get('asn') or row.get('AS')
p = row.get('IP Prefix') or row.get('prefix') or row.get('Prefix')
m = row.get('Max Length') or row.get('maxLength') or row.get('maxlength')
if a and p and m:
vrps.add(f"{{asn(a)}},{{str(p).strip().strip(chr(34))}},{{int(str(m).strip())}}")
except FileNotFoundError:
pass
vaps = set()
try:
data = json.load(open(json_path, encoding='utf-8', errors='replace'))
except FileNotFoundError:
data = {{}}
for key in ('aspas', 'aspaAssertions', 'vaps'):
for item in data.get(key, []) if isinstance(data, dict) else []:
if not isinstance(item, dict):
continue
c = item.get('customer') or item.get('customer_asid') or item.get('customerAsid') or item.get('customerAsn')
ps = item.get('providers') or item.get('providerAsns') or item.get('provider_asns') or []
if c is None:
continue
if isinstance(ps, str):
pitems = [p for p in re.split(r'[;,\\s]+', ps.strip()) if p]
else:
pitems = ps
providers = sorted({{asn(p) for p in pitems}}, key=lambda x: int(x[2:]))
vaps.add(f"{{asn(c)}}|{{','.join(providers)}}")
with gzip.open(vrps_out, 'wt', encoding='utf-8') as f:
for row in sorted(vrps):
f.write(row + '\\n')
with gzip.open(vaps_out, 'wt', encoding='utf-8') as f:
for row in sorted(vaps):
f.write(row + '\\n')
json.dump({{'counts': {{'vrps': len(vrps), 'vaps': len(vaps)}}}}, open(meta, 'w', encoding='utf-8'), indent=2, sort_keys=True)
json.dump({{'counts': {{'vrps': len(vrps), 'vaps': len(vaps)}}}}, open(counts_out, 'w', encoding='utf-8'), indent=2, sort_keys=True)
PY
fi
;;
esac
done
done
"""
ssh_script_stream(args.ssh_target, remote_script, check=False)
def expected_remote_files(runs: int, active_rp_names: list[str]) -> list[str]:
files = [
"env/before.txt",
"env/after.txt",
"sources-version-metadata.json",
]
for rp in active_rp_names:
for run_index in range(1, runs + 1):
base = f"rp-runs/{rp}/runs/run_{run_index:04d}"
files.extend([
f"{base}/start-utc.txt",
f"{base}/end-utc.txt",
f"{base}/exit-code.txt",
f"{base}/process-time.txt",
f"{base}/stdout.log",
f"{base}/stderr.log",
])
if rp == "ours-rp":
files.extend([
f"{base}/normalized-products.json",
f"{base}/normalized-vrps.txt.gz",
f"{base}/normalized-vaps.txt.gz",
f"{base}/ours-rp-counts.json",
f"{base}/stage-timing.json",
])
elif rp == "routinator":
files.extend([
f"{base}/normalized-products.json",
f"{base}/normalized-vrps.txt.gz",
f"{base}/normalized-vaps.txt.gz",
f"{base}/routinator-counts.json",
])
else:
files.extend([
f"{base}/normalized-products.json",
f"{base}/normalized-vrps.txt.gz",
f"{base}/normalized-vaps.txt.gz",
f"{base}/rpki-client-counts.json",
])
return files
def process_outputs(args: argparse.Namespace, binary_meta: dict[str, Any], rirs: list[str]) -> dict[str, Any]:
local_remote_root = args.run_root / "remote"
rp_results: dict[str, Any] = {}
for rp in args.active_rps:
runs = []
for run_index in range(1, args.runs + 1):
run_dir = local_remote_root / "rp-runs" / rp / "runs" / f"run_{run_index:04d}"
mode = "snapshot" if run_index == 1 else "delta"
run_meta = collect_run(rp, run_index, mode, run_dir, local_remote_root)
runs.append(run_meta)
rp_results[rp] = {"version": binary_meta[rp], "runs": runs}
add_within_changes(rp_results)
cross = build_cross_comparison(rp_results, args.runs, local_remote_root)
report = {
"generatedAtUtc": utc_iso(),
"rirs": rirs,
"runsPerRp": args.runs,
"schedule": args.schedule,
"oursRsyncScope": args.ours_rsync_scope,
"routinatorEnableAspa": args.routinator_enable_aspa,
"rpkiClientParserWorkers": args.rpki_client_parser_workers,
"remoteRoot": args.remote_root,
"sshTarget": args.ssh_target,
"rpResults": rp_results,
"crossRpComparison": cross,
}
write_json(args.run_root / "three-rp-performance-comparison.json", report)
write_xml(args.run_root / "three-rp-performance-comparison.xml", report)
write_html_report(args.run_root / "three-rp-performance-comparison.html", report)
return report
def collect_run(rp: str, run_index: int, mode: str, run_dir: Path, root: Path) -> dict[str, Any]:
exit_code = int((run_dir / "exit-code.txt").read_text().strip()) if (run_dir / "exit-code.txt").is_file() else -1
time_info = parse_time_file(run_dir / "process-time.txt")
start = (run_dir / "start-utc.txt").read_text().strip() if (run_dir / "start-utc.txt").is_file() else ""
end = (run_dir / "end-utc.txt").read_text().strip() if (run_dir / "end-utc.txt").is_file() else ""
if rp == "ours-rp":
vrps = read_set(run_dir / "normalized-vrps.txt") or normalize_vrps_from_csv(run_dir / "vrps.csv")
vaps = read_set(run_dir / "normalized-vaps.txt") or normalize_vaps_from_csv(run_dir / "vaps.csv")
elif rp == "routinator":
vrps = read_set(run_dir / "normalized-vrps.txt") or normalize_vrps_from_json(run_dir / "routinator.json")
vaps = read_set(run_dir / "normalized-vaps.txt") or normalize_vaps_from_json(run_dir / "routinator.json")
if not vrps and not vaps:
counts = remote_counts(run_dir / "routinator-counts.json")
vrps = placeholder_set("vrp", counts.get("vrps", 0))
vaps = placeholder_set("vap", counts.get("vaps", 0))
else:
vrps = read_set(run_dir / "normalized-vrps.txt") or normalize_vrps_from_csv(run_dir / "csv") or normalize_vrps_from_json(run_dir / "json")
vaps = read_set(run_dir / "normalized-vaps.txt") or normalize_vaps_from_json(run_dir / "json")
if not vrps and not vaps:
counts = remote_counts(run_dir / "rpki-client-counts.json")
vrps = placeholder_set("vrp", counts.get("vrps", 0))
vaps = placeholder_set("vap", counts.get("vaps", 0))
write_gzip_set(run_dir / "normalized-vrps.txt.gz", vrps)
write_gzip_set(run_dir / "normalized-vaps.txt.gz", vaps)
products = {"vrps": len(vrps), "vaps": len(vaps)}
normalized = {"vrps": sorted(vrps), "vaps": sorted(vaps), "counts": products}
write_json(run_dir / "normalized-products.json", normalized)
artifacts = [
artifact(run_dir / "process-time.txt", root, "process-time"),
artifact(run_dir / "stdout.log", root, "stdout"),
artifact(run_dir / "stderr.log", root, "stderr"),
artifact(run_dir / "normalized-vrps.txt.gz", root, "normalized-vrps-gzip"),
artifact(run_dir / "normalized-vaps.txt.gz", root, "normalized-vaps-gzip"),
]
for candidate, type_name in [
("report.json", "ours-report"),
("routinator.json", "routinator-json"),
("json", "rpki-client-json"),
("csv", "rpki-client-csv"),
("result.ccr", "ccr"),
("input.cir", "cir"),
("vrps.csv", "ours-vrps-csv"),
("vaps.csv", "ours-vaps-csv"),
]:
path = run_dir / candidate
if path.exists():
artifacts.append(artifact(path, root, type_name))
run_meta = {
"rp": rp,
"runIndex": run_index,
"syncMode": mode,
"startUtc": start,
"endUtc": end,
"exitCode": exit_code,
"wallMs": time_info.get("wallMs", 0),
"maxRssKb": time_info.get("maxRssKb", 0),
"userSeconds": time_info.get("userSeconds", 0.0),
"systemSeconds": time_info.get("systemSeconds", 0.0),
"cpuPercent": time_info.get("cpuPercent", ""),
"productCounts": products,
"publicationStats": collect_publication_stats(rp, run_dir),
"runDir": str(run_dir),
"artifacts": artifacts,
}
write_json(run_dir / "run-meta.json", run_meta)
write_json(run_dir / "artifacts.json", artifacts)
return run_meta
def collect_publication_stats(rp: str, run_dir: Path) -> dict[str, Any]:
if rp == "ours-rp":
stage_path = run_dir / "stage-timing.json"
if not stage_path.is_file():
return {"available": False, "source": "stage-timing.json"}
stage = load_json(stage_path)
return {
"available": True,
"source": "stage-timing.json",
"publicationPoints": int(stage.get("publication_points", 0) or 0),
"repoSyncMsTotal": int(stage.get("repo_sync_ms_total", 0) or 0),
"publicationPointRepoSyncMsTotal": int(stage.get("publication_point_repo_sync_ms_total", 0) or 0),
"rrdpDownloadMsTotal": int(stage.get("rrdp_download_ms_total", 0) or 0),
"rsyncDownloadMsTotal": int(stage.get("rsync_download_ms_total", 0) or 0),
"downloadEventCount": int(stage.get("download_event_count", 0) or 0),
"downloadBytesTotal": int(stage.get("download_bytes_total", 0) or 0),
}
if rp == "rpki-client":
stdout_path = run_dir / "stdout.log"
if not stdout_path.is_file():
return {"available": False, "source": "stdout.log"}
text = stdout_path.read_text(encoding="utf-8", errors="replace")
manifests = re.search(r"Manifests:\s+(\d+)\s+\((\d+)\s+failed parse,\s+(\d+)\s+seqnum gaps\)", text)
repos = re.search(r"Repositories:\s+(\d+)", text)
mfts_hash = re.search(r"CCR manifest state hash:\s+([A-Za-z0-9+/=]+)", text)
return {
"available": bool(manifests or repos or mfts_hash),
"source": "stdout.log",
"manifests": int(manifests.group(1)) if manifests else 0,
"manifestFailedParse": int(manifests.group(2)) if manifests else 0,
"manifestSeqnumGaps": int(manifests.group(3)) if manifests else 0,
"repositories": int(repos.group(1)) if repos else 0,
"ccrManifestStateHash": mfts_hash.group(1) if mfts_hash else "",
}
return {
"available": False,
"source": "jsonext/stdout",
"reason": "Routinator jsonext output used by this benchmark does not expose full publication point or manifest-state set.",
}
def remote_counts(path: Path) -> dict[str, int]:
if not path.is_file():
return {}
data = load_json(path)
counts = data.get("counts", {}) if isinstance(data, dict) else {}
return {
"vrps": int(counts.get("vrps", 0) or 0),
"vaps": int(counts.get("vaps", 0) or 0),
}
def placeholder_set(prefix: str, count: int) -> set[str]:
return {f"{prefix}:{idx}" for idx in range(count)}
def add_within_changes(rp_results: dict[str, Any]) -> None:
for rp, data in rp_results.items():
previous_vrps: set[str] | None = None
previous_vaps: set[str] | None = None
for run in data["runs"]:
run_dir = Path(run["runDir"])
current_vrps = read_set(run_dir / "normalized-vrps.txt")
current_vaps = read_set(run_dir / "normalized-vaps.txt")
if previous_vrps is None:
run["deltaFromPrevious"] = {"available": False}
else:
run["deltaFromPrevious"] = {
"available": True,
"vrp": set_delta(previous_vrps, current_vrps),
"vap": set_delta(previous_vaps or set(), current_vaps),
}
previous_vrps = current_vrps
previous_vaps = current_vaps
def build_cross_comparison(rp_results: dict[str, Any], runs: int, root: Path) -> dict[str, Any]:
pairs = [
(left, right)
for left, right in [("ours-rp", "routinator"), ("ours-rp", "rpki-client"), ("routinator", "rpki-client")]
if left in rp_results and right in rp_results
]
by_run = []
for run_index in range(1, runs + 1):
item = {"runIndex": run_index, "pairs": []}
for left, right in pairs:
left_dir = Path(rp_results[left]["runs"][run_index - 1]["runDir"])
right_dir = Path(rp_results[right]["runs"][run_index - 1]["runDir"])
item["pairs"].append({
"left": left,
"right": right,
"vrp": overlap(read_set(left_dir / "normalized-vrps.txt"), read_set(right_dir / "normalized-vrps.txt")),
"vap": overlap(read_set(left_dir / "normalized-vaps.txt"), read_set(right_dir / "normalized-vaps.txt")),
})
by_run.append(item)
runtime = {}
for rp, data in rp_results.items():
walls = [run.get("wallMs", 0) for run in data["runs"]]
rss = [run.get("maxRssKb", 0) for run in data["runs"]]
delta_walls = walls[1:]
runtime[rp] = {
"snapshotWallMs": walls[0] if walls else 0,
"deltaAverageWallMs": round(sum(delta_walls) / len(delta_walls), 3) if delta_walls else 0,
"deltaMinWallMs": min(delta_walls) if delta_walls else 0,
"deltaMaxWallMs": max(delta_walls) if delta_walls else 0,
"maxRssKb": max(rss) if rss else 0,
}
return {"runtimeSummary": runtime, "productOverlapByRun": by_run}
def ms_to_s(value: float | int) -> str:
return f"{float(value) / 1000:.3f}s"
def kb_to_mb(value: float | int) -> str:
return f"{float(value) / 1024:.1f} MB"
def comma(value: Any) -> str:
try:
return f"{int(value):,}"
except Exception:
return str(value)
def pct(value: float) -> str:
return f"{value:.6f}"
def h(text: Any) -> str:
return html.escape(str(text), quote=True)
def rp_label(rp: str) -> str:
return {"ours-rp": "ours RP", "routinator": "Routinator", "rpki-client": "rpki-client"}.get(rp, rp)
def min_class(value: Any, values: list[Any]) -> str:
clean = [v for v in values if isinstance(v, (int, float)) and v > 0]
if not clean:
return ""
return ' class="best"' if value == min(clean) else ""
def total_wall_ms(report: dict[str, Any], rp: str) -> int:
return int(sum(run.get("wallMs", 0) for run in report["rpResults"][rp]["runs"]))
def total_wall_chart(report: dict[str, Any]) -> str:
series = {
"ours-rp": ("#2563eb", "ours-rp"),
"routinator": ("#16a34a", "routinator"),
"rpki-client": ("#dc2626", "rpki-client"),
}
totals = {rp: total_wall_ms(report, rp) for rp in report["rpResults"]}
values = [value for value in totals.values() if value > 0]
max_v = max(values) if values else 1
if max_v == 0:
max_v = 1
left = 188
top = 46
bar_h = 28
gap = 18
width = 560
height = top + len(series) * (bar_h + gap) + 24
chunks = [
f'<svg class="chart" viewBox="0 0 840 {height}" role="img" aria-label="total wall clock by RP">',
f'<text x="18" y="24" class="svg-label">10-run total wall clock</text>',
]
legend_x = 500
for i, (rp, (color, label)) in enumerate(series.items()):
if rp not in totals:
continue
x = legend_x + i * 110
chunks.append(f'<rect x="{x}" y="16" width="14" height="14" rx="3" fill="{color}"/>')
chunks.append(f'<text x="{x + 20}" y="29" class="svg-label">{label}</text>')
for index, (rp, (color, label)) in enumerate(series.items()):
if rp not in totals:
continue
value = totals[rp]
y = top + index * (bar_h + gap)
bar_w = max(8.0, width * (value / max_v))
chunks.append(
f'<text x="18" y="{y + 20}" class="svg-label">{label}</text>'
f'<rect x="{left}" y="{y}" width="{bar_w:.1f}" height="{bar_h}" rx="8" fill="{color}"/>'
f'<text x="{left + bar_w + 12:.1f}" y="{y + 20}" class="svg-label" style="fill:{color}">{ms_to_s(value)}</text>'
f'<title>{label}: {ms_to_s(value)}</title>'
)
chunks.append("</svg>")
return "\n".join(chunks)
def html_summary_table(report: dict[str, Any]) -> str:
runtime = report["crossRpComparison"]["runtimeSummary"]
total_values = [total_wall_ms(report, rp) for rp in runtime]
snapshot_values = [v["snapshotWallMs"] for v in runtime.values()]
delta_avg_values = [v["deltaAverageWallMs"] for v in runtime.values()]
delta_min_values = [v["deltaMinWallMs"] for v in runtime.values()]
delta_max_values = [v["deltaMaxWallMs"] for v in runtime.values()]
rss_values = [v["maxRssKb"] for v in runtime.values()]
rows = []
for rp, values in runtime.items():
total = total_wall_ms(report, rp)
rows.append(
"<tr>"
f"<td><strong>{h(rp_label(rp))}</strong></td>"
f"<td{min_class(total, total_values)}>{ms_to_s(total)}</td>"
f"<td{min_class(values['snapshotWallMs'], snapshot_values)}>{ms_to_s(values['snapshotWallMs'])}</td>"
f"<td{min_class(values['deltaAverageWallMs'], delta_avg_values)}>{ms_to_s(values['deltaAverageWallMs'])}</td>"
f"<td{min_class(values['deltaMinWallMs'], delta_min_values)}>{ms_to_s(values['deltaMinWallMs'])}</td>"
f"<td{min_class(values['deltaMaxWallMs'], delta_max_values)}>{ms_to_s(values['deltaMaxWallMs'])}</td>"
f"<td{min_class(values['maxRssKb'], rss_values)}>{kb_to_mb(values['maxRssKb'])}</td>"
"</tr>"
)
return (
total_wall_chart(report)
+ '<div class="table-wrap"><table><thead><tr>'
"<th>RP</th><th>10-run total</th><th>snapshot</th><th>delta avg</th><th>delta min</th><th>delta max</th><th>max RSS</th>"
"</tr></thead><tbody>"
+ "\n".join(rows)
+ "</tbody></table></div>"
)
def polyline_chart(report: dict[str, Any]) -> str:
series = {
"ours-rp": ("#2563eb", "ours-rp"),
"routinator": ("#16a34a", "routinator"),
"rpki-client": ("#dc2626", "rpki-client"),
}
runs = report["runsPerRp"]
all_values = [
run["wallMs"] / 1000
for rp in report["rpResults"].values()
for run in rp["runs"]
if run.get("wallMs", 0) > 0
]
min_v = 0
max_v = max(all_values) if all_values else 1
if max_v == min_v:
max_v += 1
left, top, width, height = 62, 42, 690, 208
step = width / max(runs - 1, 1)
def point(index: int, value_ms: int) -> tuple[float, float]:
x = left + (index - 1) * step
value_s = value_ms / 1000
y = top + (max_v - value_s) / (max_v - min_v) * height
return x, y
chunks = [
'<svg class="chart" viewBox="0 0 840 300" role="img" aria-label="combined per-run wall time">',
f'<line x1="{left}" y1="{top}" x2="{left}" y2="{top + height}" class="axis"/>',
f'<line x1="{left}" y1="{top + height}" x2="{left + width}" y2="{top + height}" class="axis"/>',
f'<text x="16" y="{top + 4}" class="svg-small">{max_v:.1f}s</text>',
f'<text x="16" y="{top + height + 4}" class="svg-small">{min_v:.1f}s</text>',
'<text x="18" y="24" class="svg-label">Wall clock / run</text>',
]
legend_x = 480
for i, (rp, (color, label)) in enumerate(series.items()):
if rp not in report["rpResults"]:
continue
x = legend_x + i * 116
chunks.append(f'<rect x="{x}" y="16" width="14" height="14" rx="3" fill="{color}"/>')
chunks.append(f'<text x="{x + 20}" y="29" class="svg-label">{label}</text>')
for rp, (color, label) in series.items():
if rp not in report["rpResults"]:
continue
points = [point(run["runIndex"], run.get("wallMs", 0)) for run in report["rpResults"][rp]["runs"]]
chunks.append(
f'<polyline fill="none" stroke="{color}" stroke-width="3" points="'
+ " ".join(f"{x:.1f},{y:.1f}" for x, y in points)
+ '"/>'
)
chunks.append(f'<g fill="{color}">')
for run, (x, y) in zip(report["rpResults"][rp]["runs"], points):
chunks.append(
f'<circle cx="{x:.1f}" cy="{y:.1f}" r="4">'
f'<title>{label} run {run["runIndex"]}: {ms_to_s(run.get("wallMs", 0))}</title></circle>'
)
chunks.append("</g>")
if points:
x, y = points[-1]
chunks.append(f'<text x="{x + 10:.1f}" y="{y + 4:.1f}" class="svg-label" style="fill:{color}">{label}</text>')
for idx in range(1, runs + 1):
x = left + (idx - 1) * step
chunks.append(f'<text x="{x:.1f}" y="278" text-anchor="middle" class="svg-small">{idx}</text>')
chunks.append("</svg>")
return "\n".join(chunks)
def run_detail_table(report: dict[str, Any], rp: str) -> str:
rows = []
for run in report["rpResults"][rp]["runs"]:
delta = run.get("deltaFromPrevious", {})
if delta.get("available"):
vrp_delta = f"+{delta['vrp']['added']}/-{delta['vrp']['removed']}"
vap_delta = f"+{delta['vap']['added']}/-{delta['vap']['removed']}"
else:
vrp_delta = vap_delta = ""
rows.append(
"<tr>"
f"<td>{run['runIndex']}</td><td>{h(run['syncMode'])}</td><td>{ms_to_s(run.get('wallMs', 0))}</td>"
f"<td>{kb_to_mb(run.get('maxRssKb', 0))}</td><td>{comma(run['productCounts']['vrps'])}</td>"
f"<td>{comma(run['productCounts']['vaps'])}</td><td>{vrp_delta}</td><td>{vap_delta}</td>"
"</tr>"
)
return (
f"<h3>{h(rp_label(rp))}</h3>"
'<div class="table-wrap"><table><thead><tr>'
"<th>run</th><th>mode</th><th>wall</th><th>max RSS</th><th>VRPs</th><th>VAPs</th><th>VRP delta</th><th>VAP delta</th>"
"</tr></thead><tbody>"
+ "\n".join(rows)
+ "</tbody></table></div>"
)
def overlap_table(report: dict[str, Any], run_index: int) -> str:
run = report["crossRpComparison"]["productOverlapByRun"][run_index - 1]
rows = []
for pair in run["pairs"]:
rows.append(
"<tr>"
f"<td><strong>{h(pair['left'])}</strong> vs <strong>{h(pair['right'])}</strong></td>"
f"<td>{comma(pair['vrp']['intersection'])}</td><td>{comma(pair['vrp']['onlyLeft'])}</td>"
f"<td>{comma(pair['vrp']['onlyRight'])}</td><td>{pct(pair['vrp']['jaccard'])}</td>"
f"<td>{comma(pair['vap']['intersection'])}</td><td>{comma(pair['vap']['onlyLeft'])}</td>"
f"<td>{comma(pair['vap']['onlyRight'])}</td><td>{pct(pair['vap']['jaccard'])}</td>"
"</tr>"
)
return (
'<div class="table-wrap"><table><thead><tr>'
"<th>pair</th><th>VRP intersection</th><th>only left</th><th>only right</th><th>VRP Jaccard</th>"
"<th>VAP intersection</th><th>only left</th><th>only right</th><th>VAP Jaccard</th>"
"</tr></thead><tbody>"
+ "\n".join(rows)
+ "</tbody></table></div>"
)
def publication_table(report: dict[str, Any], run_index: int) -> str:
rows = []
for rp, data in report["rpResults"].items():
run = data["runs"][run_index - 1]
stats = run.get("publicationStats", {})
if rp == "ours-rp":
pp = f"{comma(stats.get('publicationPoints', 0))} PP processed"
repo = f"sync total {ms_to_s(stats.get('repoSyncMsTotal', 0))}"
source = "stage-timing.json"
elif rp == "rpki-client":
pp = f"{comma(stats.get('manifests', 0))} manifests / {comma(stats.get('manifestFailedParse', 0))} failed parse"
repo = f"{comma(stats.get('repositories', 0))} repositories"
source = "stdout.log"
else:
pp = "当前产物未输出"
repo = "当前产物未输出"
source = "jsonext 不含完整 PP/manifest state"
rows.append(
"<tr>"
f"<td><strong>{h(rp_label(rp))}</strong></td><td>{h(pp)}</td><td>{h(repo)}</td>"
f"<td>{h(source)}</td></tr>"
)
return (
'<div class="table-wrap"><table><thead><tr>'
"<th>RP</th><th>PP/Manifest 计数</th><th>Repo/Sync 信息</th><th>数据来源</th>"
"</tr></thead><tbody>"
+ "\n".join(rows)
+ "</tbody></table></div>"
)
def write_html_report(path: Path, report: dict[str, Any]) -> None:
runtime = report["crossRpComparison"]["runtimeSummary"]
fastest_snapshot = min(runtime.items(), key=lambda item: item[1]["snapshotWallMs"] or 10**18)
fastest_delta = min(runtime.items(), key=lambda item: item[1]["deltaAverageWallMs"] or 10**18)
first_overlap = report["crossRpComparison"]["productOverlapByRun"][0]["pairs"]
ours_routinator_vap = next(
(pair["vap"]["jaccard"] for pair in first_overlap if pair["left"] == "ours-rp" and pair["right"] == "routinator"),
None,
)
all_ok = all(run["exitCode"] == 0 for rp in report["rpResults"].values() for run in rp["runs"])
command = (
"python3 rpki_2/rpki/scripts/compare/run_three_rp_10run_benchmark.py "
f"--run-root {path.parent} --remote-root {report['remoteRoot']} --runs {report['runsPerRp']} "
f"--rirs {','.join(report['rirs'])} --rps {','.join(report['rpResults'].keys())} "
f"--schedule {report.get('schedule', 'round-robin')} --ours-rsync-scope {report.get('oursRsyncScope', '')} "
"--routinator-enable-aspa"
)
rpki_client_parser_workers = int(report.get("rpkiClientParserWorkers", 0) or 0)
rpki_client_p_arg = f" -p {rpki_client_parser_workers}" if rpki_client_parser_workers else ""
rpki_client_p_cli = f" --rpki-client-parser-workers {rpki_client_parser_workers}" if rpki_client_parser_workers else ""
command += rpki_client_p_cli
html_text = f"""<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8"/>
<meta name="viewport" content="width=device-width, initial-scale=1"/>
<title>三 RP Round-Robin 性能与产物对比报告</title>
<style>
:root {{ --bg:#f6f8fb; --panel:#ffffff; --ink:#172033; --muted:#5d6b82; --line:#d9e1ee; --blue:#2563eb; --green:#16a34a; --red:#dc2626; --orange:#ea580c; --shadow:0 12px 30px rgba(15,23,42,.08); }}
* {{ box-sizing:border-box; }} body {{ margin:0; font-family:-apple-system,BlinkMacSystemFont,"Segoe UI",Roboto,"Noto Sans SC",Arial,sans-serif; background:var(--bg); color:var(--ink); line-height:1.6; }}
header {{ padding:42px 48px 28px; background:linear-gradient(135deg,#0f172a,#1d4ed8); color:#fff; }} header h1 {{ margin:0 0 10px; font-size:34px; }} header p {{ margin:0; max-width:1100px; color:#dbeafe; }}
main {{ padding:28px 48px 64px; max-width:1380px; margin:0 auto; }} section {{ background:var(--panel); border:1px solid var(--line); border-radius:18px; padding:24px; margin:22px 0; box-shadow:var(--shadow); }}
h2 {{ margin:0 0 14px; font-size:24px; }} h3 {{ margin:22px 0 10px; font-size:18px; }}
.cards {{ display:grid; grid-template-columns:repeat(4,minmax(0,1fr)); gap:16px; margin:18px 0 4px; }} .card {{ background:#f8fbff; border:1px solid var(--line); border-radius:16px; padding:18px; }} .card-title {{ color:var(--muted); font-size:13px; }} .metric {{ font-size:30px; font-weight:800; margin:6px 0; }} .sub {{ color:var(--muted); font-size:13px; }}
.grid {{ display:grid; grid-template-columns:1fr 1fr; gap:18px; }} .note {{ background:#fff7ed; border:1px solid #fed7aa; color:#7c2d12; border-radius:14px; padding:14px 16px; }} .ok {{ background:#ecfdf5; border-color:#bbf7d0; color:#14532d; }}
.table-wrap {{ overflow:auto; border:1px solid var(--line); border-radius:14px; margin:12px 0; }} table {{ border-collapse:collapse; width:100%; min-width:920px; background:white; }} th,td {{ padding:10px 12px; border-bottom:1px solid var(--line); text-align:right; white-space:nowrap; }} th:first-child,td:first-child {{ text-align:left; }} th {{ background:#f1f5f9; color:#334155; font-weight:700; }} tr:last-child td {{ border-bottom:0; }} .best {{ font-weight:800; color:var(--green); }}
code {{ background:#eef2ff; padding:2px 6px; border-radius:6px; }} pre {{ background:#0f172a; color:#e2e8f0; padding:16px; border-radius:14px; overflow:auto; }} .chart {{ width:100%; height:auto; border:1px solid var(--line); border-radius:14px; background:#fbfdff; }} .axis {{ stroke:#94a3b8; stroke-width:1; }} .svg-label {{ font-size:14px; fill:#334155; }} .svg-small {{ font-size:12px; fill:#64748b; }}
footer {{ color:#64748b; font-size:13px; margin-top:28px; }} @media (max-width:900px) {{ main,header {{ padding-left:20px; padding-right:20px; }} .cards,.grid {{ grid-template-columns:1fr; }} }}
</style>
</head>
<body>
<header>
<h1>三 RP Round-Robin 性能与产物对比报告</h1>
<p>all5 live 十轮对比,调度顺序为 <code>rp1 run1 → rp2 run1 → rp3 run1 → rp1 run2 ...</code>ours RP 使用 <code>{h(report.get('oursRsyncScope', ''))}</code>Routinator 启用 <code>--enable-aspa</code>rpki-client 使用 official <code>9.8</code>{h(rpki_client_p_arg)} 正确构建。</p>
</header>
<main>
<section>
<h2>执行摘要</h2>
<div class="cards">
<div class="card"><div class="card-title">最快 Snapshot</div><div class="metric">{h(rp_label(fastest_snapshot[0]))}</div><div class="sub">{ms_to_s(fastest_snapshot[1]['snapshotWallMs'])}</div></div>
<div class="card"><div class="card-title">最快 Delta 均值</div><div class="metric">{h(rp_label(fastest_delta[0]))}</div><div class="sub">{ms_to_s(fastest_delta[1]['deltaAverageWallMs'])}</div></div>
<div class="card"><div class="card-title">ASPA 对齐</div><div class="metric">{'N/A' if ours_routinator_vap is None else f'{ours_routinator_vap:.6f}'}</div><div class="sub">ours vs Routinator VAP Jaccard</div></div>
<div class="card"><div class="card-title">Run 状态</div><div class="metric">{'PASS' if all_ok else 'FAIL'}</div><div class="sub">{report['runsPerRp']} run × {len(report['rpResults'])} RP</div></div>
</div>
<div class="note ok"><strong>调度变更:</strong>本轮使用 round-robin 串行调度,避免先跑完一个 RP 再跑另一个 RP 带来的时间窗口偏差。</div>
</section>
<section>
<h2>实验配置</h2>
<div class="grid"><div><h3>运行参数</h3><ul>
<li>远端机器:<code>{h(report['sshTarget'])}</code></li><li>远端目录:<code>{h(report['remoteRoot'])}</code></li>
<li>RIR<code>{h(','.join(report['rirs']))}</code></li><li>调度:<code>{h(report.get('schedule', ''))}</code></li>
</ul></div><div><h3>关键变量</h3><ul>
<li>ours RP<code>--rsync-scope {h(report.get('oursRsyncScope', ''))}</code></li>
<li>Routinator<code>--enable-aspa</code></li><li>rpki-clientofficial <code>9.8</code>{h(rpki_client_p_arg)},本地 fixture TAL/TA</li>
<li>首轮 snapshot后续 delta</li>
</ul></div></div>
<pre>{h(command)}</pre>
</section>
<section><h2>总览:运行时间与内存</h2>{html_summary_table(report)}</section>
<section><h2>每轮 wall clock 趋势</h2>{polyline_chart(report)}</section>
<section><h2>每轮明细</h2>{''.join(run_detail_table(report, rp) for rp in report['rpResults'])}</section>
<section><h2>产物重合度</h2><h3>run1 snapshot</h3>{overlap_table(report, 1)}<h3>run{report['runsPerRp']} final delta</h3>{overlap_table(report, report['runsPerRp'])}</section>
<section><h2>发布点 / Manifest 维度</h2><h3>run1 snapshot</h3>{publication_table(report, 1)}<h3>run{report['runsPerRp']} final delta</h3>{publication_table(report, report['runsPerRp'])}<div class="note">Routinator 当前 jsonext 产物没有完整发布点或 CCR manifest state因此发布点维度只能对 ours RP 与 rpki-client 做计数级参考,不能做全集合重合度。</div></section>
<section><h2>验证与产物</h2><ul><li>JSON/XML/HTML 均由同一份 benchmark JSON 生成。</li><li>本地 JSON<code>{h(path.parent / 'three-rp-performance-comparison.json')}</code></li><li>本地 XML<code>{h(path.parent / 'three-rp-performance-comparison.xml')}</code></li><li>远端完整产物:<code>{h(report['remoteRoot'])}</code></li></ul></section>
<footer>Generated at <code>{h(report['generatedAtUtc'])}</code>. No external assets.</footer>
</main>
</body>
</html>
"""
path.write_text(html_text, encoding="utf-8")
def write_xml(path: Path, report: dict[str, Any]) -> None:
root = ET.Element("rpPerformanceComparison", {
"generatedAtUtc": report["generatedAtUtc"],
"remoteHost": report["sshTarget"],
"remoteRoot": report["remoteRoot"],
"runsPerRp": str(report["runsPerRp"]),
"rirs": ",".join(report["rirs"]),
"schedule": str(report.get("schedule", "")),
"oursRsyncScope": str(report.get("oursRsyncScope", "")),
"routinatorEnableAspa": str(report.get("routinatorEnableAspa", "")).lower(),
"rpkiClientParserWorkers": str(report.get("rpkiClientParserWorkers", "")),
})
env = ET.SubElement(root, "environment")
ET.SubElement(env, "artifactRoot").text = str(path.parent)
for rp, data in report["rpResults"].items():
rp_el = ET.SubElement(root, "rp", {"name": rp})
version = data["version"]
ET.SubElement(rp_el, "version", {k: str(v) for k, v in version.items() if k in {"commit", "selectedTag", "tag", "binarySha256", "binarySize", "sourceDir", "binaryPath"}})
runs_el = ET.SubElement(rp_el, "runs")
for run in data["runs"]:
run_el = ET.SubElement(runs_el, "run", {
"index": str(run["runIndex"]),
"mode": run["syncMode"],
"exitCode": str(run["exitCode"]),
})
ET.SubElement(run_el, "timing", {
"startUtc": run.get("startUtc", ""),
"endUtc": run.get("endUtc", ""),
"wallMs": str(run.get("wallMs", 0)),
})
ET.SubElement(run_el, "resource", {
"maxRssKb": str(run.get("maxRssKb", 0)),
"userSeconds": str(run.get("userSeconds", 0.0)),
"systemSeconds": str(run.get("systemSeconds", 0.0)),
"cpuPercent": str(run.get("cpuPercent", "")),
})
ET.SubElement(run_el, "products", {
"vrps": str(run["productCounts"]["vrps"]),
"vaps": str(run["productCounts"]["vaps"]),
})
delta = run.get("deltaFromPrevious", {"available": False})
delta_el = ET.SubElement(run_el, "deltaFromPrevious", {"available": str(delta.get("available", False)).lower()})
if delta.get("available"):
ET.SubElement(delta_el, "vrp", {k: str(v) for k, v in delta["vrp"].items()})
ET.SubElement(delta_el, "vap", {k: str(v) for k, v in delta["vap"].items()})
artifacts_el = ET.SubElement(run_el, "artifacts")
for art in run.get("artifacts", []):
ET.SubElement(artifacts_el, "artifact", {k: str(v) for k, v in art.items() if k != "path"})
cross_el = ET.SubElement(root, "crossRpComparison")
runtime_el = ET.SubElement(cross_el, "runtimeSummary")
for rp, values in report["crossRpComparison"]["runtimeSummary"].items():
ET.SubElement(runtime_el, "rp", {"name": rp, **{k: str(v) for k, v in values.items()}})
overlaps_el = ET.SubElement(cross_el, "productOverlapByRun")
for run in report["crossRpComparison"]["productOverlapByRun"]:
run_el = ET.SubElement(overlaps_el, "run", {"index": str(run["runIndex"])})
for pair in run["pairs"]:
pair_el = ET.SubElement(run_el, "pair", {"left": pair["left"], "right": pair["right"]})
ET.SubElement(pair_el, "vrp", {k: str(v) for k, v in pair["vrp"].items()})
ET.SubElement(pair_el, "vap", {k: str(v) for k, v in pair["vap"].items()})
ET.indent(root, space=" ")
tree = ET.ElementTree(root)
path.parent.mkdir(parents=True, exist_ok=True)
tree.write(path, encoding="utf-8", xml_declaration=True)
def main() -> None:
parser = argparse.ArgumentParser(description="Run serial three-RP 10-run performance comparison on a remote host.")
parser.add_argument("--run-root", type=Path, required=True)
parser.add_argument("--remote-root", required=True)
parser.add_argument("--ssh-target", default=os.environ.get("SSH_TARGET", "root@47.251.56.108"))
parser.add_argument("--runs", type=int, default=10)
parser.add_argument("--rirs", default="afrinic,apnic,arin,lacnic,ripe")
parser.add_argument("--skip-build", action="store_true")
parser.add_argument("--skip-remote-run", action="store_true")
parser.add_argument("--reuse-remote-root", action="store_true", help="Do not prepare or modify the remote root before fetching existing run artifacts.")
parser.add_argument("--continue-on-failure", action="store_true")
parser.add_argument("--rps", default=",".join(RP_ORDER), help=f"Comma-separated RP list. Allowed: {','.join(RP_ORDER)}")
parser.add_argument("--schedule", default="round-robin", choices=["round-robin", "rp-block"], help="Execution schedule: run-index round-robin or one RP block at a time.")
parser.add_argument("--ours-rsync-scope", default="publication-point", choices=sorted(RPKI_SCOPE_POLICIES))
parser.add_argument("--routinator-enable-aspa", action="store_true", help="Pass --enable-aspa to Routinator so jsonext includes ASPA/VAP data.")
parser.add_argument("--rpki-client-parser-workers", type=int, default=0, help="Pass -p N to rpki-client. 0 keeps rpki-client default.")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
if args.rpki_client_parser_workers < 0:
raise SystemExit("--rpki-client-parser-workers must be >= 0")
args.run_root = args.run_root.resolve()
args.run_root.mkdir(parents=True, exist_ok=True)
args.active_rps = active_rps(args)
rirs = parse_rirs(args.rirs)
versions = resolve_versions()
plan = {
"runRoot": str(args.run_root),
"remoteRoot": args.remote_root,
"sshTarget": args.ssh_target,
"runs": args.runs,
"rirs": rirs,
"versions": versions,
"rpOrder": args.active_rps,
"schedule": args.schedule,
"oursRsyncScope": args.ours_rsync_scope,
"routinatorEnableAspa": args.routinator_enable_aspa,
"rpkiClientParserWorkers": args.rpki_client_parser_workers,
}
write_json(args.run_root / "dry-run-plan.json", plan)
if args.dry_run:
print(json.dumps(plan, indent=2, ensure_ascii=False))
return
if args.reuse_remote_root:
if not args.skip_remote_run:
raise SystemExit("--reuse-remote-root requires --skip-remote-run")
binary_meta = fetch_only_binary_metadata(args)
else:
binary_meta = build_release_binaries(versions, args.skip_build, args.active_rps)
fixture_local = args.run_root / "fixtures"
build_fixture_tree(fixture_local, rirs)
prepare_remote(args, binary_meta, rirs, fixture_local)
if not args.skip_remote_run:
run_remote_matrix(args, rirs)
fetch_remote_outputs(args)
report = process_outputs(args, binary_meta, rirs)
print(f"xml_report={args.run_root / 'three-rp-performance-comparison.xml'}")
print(f"json_report={args.run_root / 'three-rp-performance-comparison.json'}")
for rp, summary in report["crossRpComparison"]["runtimeSummary"].items():
print(f"{rp}: snapshot={summary['snapshotWallMs']}ms delta_avg={summary['deltaAverageWallMs']}ms max_rss={summary['maxRssKb']}KB")
if __name__ == "__main__":
main()