diff --git a/scripts/experiments/feature043/run_sequence_triage_experiment.py b/scripts/experiments/feature043/run_sequence_triage_experiment.py new file mode 100755 index 0000000..c582325 --- /dev/null +++ b/scripts/experiments/feature043/run_sequence_triage_experiment.py @@ -0,0 +1,513 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import hashlib +import json +import os +import shlex +import subprocess +import sys +import time +from pathlib import Path +from typing import Any + +SCRIPT_DIR = Path(__file__).resolve().parent +REPO_ROOT = SCRIPT_DIR.parents[2] +DEV_ROOT = REPO_ROOT.parents[1] +FEATURE035_DIR = REPO_ROOT / "scripts" / "experiments" / "feature035" +FIXTURE_MANIFEST_PATH = FEATURE035_DIR / "fixture-manifest.json" +PORTABLE_ROOT = DEV_ROOT / "rpki-client-portable" +CACHED_CIR_RPKI_CLIENT = DEV_ROOT / ".cache" / "rpki-client-9.8-cir" / "rpki-client" +CACHED_CIR_LIBTLS = DEV_ROOT / ".cache" / "rpki-client-9.8-cir" / "libtls.so.28" + +DEFAULT_RIRS = ["afrinic", "apnic", "arin", "lacnic", "ripe"] + + +def run_local(argv: list[str], *, cwd: Path | None = None, capture: bool = False, check: bool = True) -> subprocess.CompletedProcess[str]: + result = subprocess.run(argv, cwd=str(cwd) if cwd else None, text=True, capture_output=capture, check=False) + if check and result.returncode != 0: + raise SystemExit( + f"command failed ({result.returncode}): {' '.join(shlex.quote(x) for x in argv)}\n" + f"stdout:\n{result.stdout or ''}\nstderr:\n{result.stderr or ''}" + ) + return result + + +def ssh_script(target: str, script: str, *, capture: bool = False, check: bool = True) -> subprocess.CompletedProcess[str]: + result = subprocess.run(["ssh", target, "bash", "-s"], input=script, text=True, capture_output=capture, check=False) + if check and result.returncode != 0: + raise SystemExit(f"remote script failed ({result.returncode}) on {target}\n{result.stdout}\n{result.stderr}") + return result + + +def rsync_to_remote(target: str, source: Path, destination: str | Path) -> None: + run_local(["rsync", "-a", str(source), f"{target}:{destination}"]) + + +def rsync_dir_to_remote(target: str, source: Path, destination: str | Path) -> None: + run_local(["rsync", "-a", f"{source}/", f"{target}:{destination}/"]) + + +def rsync_from_remote(target: str, source: str | Path, destination: Path) -> None: + destination.mkdir(parents=True, exist_ok=True) + run_local(["rsync", "-a", f"{target}:{source}/", f"{destination}/"]) + + +def rsync_run_artifacts_from_remote(target: str, source: str | Path, destination: Path) -> None: + destination.mkdir(parents=True, exist_ok=True) + include_args = [ + "--include", "result.ccr", + "--include", "result.cir", + "--include", "process-time.txt", + "--include", "remote-run-meta.json", + "--include", "exit-code.txt", + "--include", "started-at.txt", + "--include", "finished-at.txt", + "--exclude", "*", + ] + run_local(["rsync", "-a", *include_args, f"{target}:{source}/", f"{destination}/"]) + + +def load_json(path: Path) -> Any: + with path.open("r", encoding="utf-8") as handle: + return json.load(handle) + + +def write_json(path: Path, value: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8") as handle: + json.dump(value, handle, indent=2, sort_keys=True, ensure_ascii=False) + handle.write("\n") + + +def append_jsonl(path: Path, value: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("a", encoding="utf-8") as handle: + handle.write(json.dumps(value, sort_keys=True, ensure_ascii=False)) + handle.write("\n") + + +def utc_stamp() -> str: + return time.strftime("%Y%m%dT%H%M%SZ", time.gmtime()) + + +def rfc3339_now() -> str: + return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + + +def fixture_manifest() -> dict[str, Any]: + return load_json(FIXTURE_MANIFEST_PATH) + + +def fixture_name(rir: str, kind: str) -> str: + return Path(fixture_manifest()["rirs"][rir][kind]).name + + +def fixture_desc(rir: str) -> str: + return { + "afrinic": "afrinic", + "apnic": "apnic-rfc7730-https", + "arin": "arin", + "lacnic": "lacnic", + "ripe": "ripe-ncc", + }[rir] + + +def cir_tal_uri_for_rir(rir: str) -> str: + return { + "afrinic": "https://rpki.afrinic.net/tal/afrinic.tal", + "apnic": "https://rpki.apnic.net/tal/apnic-rfc7730-https.tal", + "arin": "https://www.arin.net/resources/manage/rpki/arin.tal", + "lacnic": "https://www.lacnic.net/innovaportal/file/4983/1/lacnic.tal", + "ripe": "https://tal.rpki.ripe.net/ripe-ncc.tal", + }[rir] + + +def sha256_file(path: Path) -> str: + digest = hashlib.sha256() + with path.open("rb") as handle: + for chunk in iter(lambda: handle.read(1024 * 1024), b""): + digest.update(chunk) + return digest.hexdigest() + + +def parse_elapsed_to_ms(raw: str) -> int: + raw = raw.strip() + if not raw: + return 0 + if "-" in raw: + days, raw = raw.split("-", 1) + else: + days = "0" + parts = raw.split(":") + if len(parts) == 3: + hours, minutes, seconds = parts + elif len(parts) == 2: + hours = "0" + minutes, seconds = parts + else: + hours = "0" + minutes = "0" + seconds = parts[0] + return int(round((int(days) * 86400 + int(hours) * 3600 + int(minutes) * 60 + float(seconds)) * 1000)) + + +def parse_time_file(path: Path) -> dict[str, Any]: + data: dict[str, Any] = {} + if not path.is_file(): + return data + for line in path.read_text(encoding="utf-8", errors="replace").splitlines(): + if "Elapsed (wall clock) time" in line: + elapsed = line.rsplit(":", 1)[1] if "):" not in line else line.rsplit("):", 1)[1] + data["wallMs"] = parse_elapsed_to_ms(elapsed) + elif "Maximum resident set size" in line: + try: + data["maxRssKb"] = int(line.rsplit(":", 1)[1].strip()) + except ValueError: + pass + return data + + +def rpki_client_bin_path() -> Path: + primary = PORTABLE_ROOT / "src" / "rpki-client" + for candidate in (primary, CACHED_CIR_RPKI_CLIENT): + if not candidate.is_file(): + continue + smoke = run_local([str(candidate), "-T", "invalid"], capture=True, check=False) + if "--ta-fixture requires :" in (smoke.stderr + smoke.stdout): + return candidate + raise SystemExit("rpki-client binary lacks CIR/TA fixture support; checkout feature/cir-output-for-rp-compare or restore .cache/rpki-client-9.8-cir/rpki-client") + + +def detect_libtls_path(rpki_client_bin: Path) -> Path: + if CACHED_CIR_LIBTLS.is_file(): + return CACHED_CIR_LIBTLS + ldd = run_local(["ldd", str(rpki_client_bin)], capture=True) + for line in ldd.stdout.splitlines(): + if "libtls.so.28" not in line or "=>" not in line: + continue + candidate = Path(line.split("=>", 1)[1].strip().split(" ", 1)[0]) + if candidate.is_file(): + return candidate + fallback = DEV_ROOT / ".cache" / "rpki-client-9.8-cir" / "libtls.so.28" + if fallback.is_file(): + return fallback + raise SystemExit("unable to locate libtls.so.28 for rpki-client") + + +def build_tool_binaries() -> None: + run_local([ + "cargo", "build", "--release", + "--bin", "rpki", + "--bin", "sequence_triage_ccr_cir", + "--bin", "cir_dump_reject_list", + ], cwd=REPO_ROOT) + _ = rpki_client_bin_path() + + +def validate_remote_disk(ssh_target: str) -> None: + script = r''' +set -euo pipefail +df -h /data / || true +python3 - <<'PY' +import shutil +for path in ['/data', '/']: + try: + usage = shutil.disk_usage(path) + except FileNotFoundError: + continue + used = usage.used / usage.total if usage.total else 0 + print(f'{path} used={used:.2%}') + if used >= 0.90: + raise SystemExit(f'{path} disk usage >= 90%; cleanup required before all5 sequence experiment') +PY +''' + ssh_script(ssh_target, script) + + +def prepare_remote(ssh_target: str, remote_root: Path, needs_rpki_client: bool) -> None: + validate_remote_disk(ssh_target) + preflight = ( + "set -euo pipefail; " + "systemctl disable --now rpki-client.timer >/dev/null 2>&1 || true; " + "systemctl stop rpki-client.service >/dev/null 2>&1 || true; " + "pkill -x rpki-client >/dev/null 2>&1 || true; " + "pkill -x routinator >/dev/null 2>&1 || true; " + f"mkdir -p {shlex.quote(str(remote_root / 'bin'))} {shlex.quote(str(remote_root / 'lib'))} " + f"{shlex.quote(str(remote_root / 'fixtures' / 'tal'))} {shlex.quote(str(remote_root / 'fixtures' / 'ta'))} " + f"{shlex.quote(str(remote_root / 'experiments'))}; " + f"df -h /data / > {shlex.quote(str(remote_root / 'df-before.txt'))} 2>&1 || true; " + f"free -h > {shlex.quote(str(remote_root / 'free-before.txt'))} 2>&1 || true" + ) + ssh_script(ssh_target, preflight) + rsync_dir_to_remote(ssh_target, REPO_ROOT / "tests" / "fixtures" / "tal", remote_root / "fixtures" / "tal") + rsync_dir_to_remote(ssh_target, REPO_ROOT / "tests" / "fixtures" / "ta", remote_root / "fixtures" / "ta") + rsync_to_remote(ssh_target, REPO_ROOT / "target" / "release" / "rpki", remote_root / "bin" / "rpki") + rsync_to_remote(ssh_target, REPO_ROOT / "target" / "release" / "sequence_triage_ccr_cir", remote_root / "bin" / "sequence_triage_ccr_cir") + rsync_to_remote(ssh_target, REPO_ROOT / "target" / "release" / "cir_dump_reject_list", remote_root / "bin" / "cir_dump_reject_list") + if needs_rpki_client: + rpki_client_bin = rpki_client_bin_path() + rsync_to_remote(ssh_target, rpki_client_bin, remote_root / "bin" / "rpki-client") + rsync_to_remote(ssh_target, detect_libtls_path(rpki_client_bin), remote_root / "lib" / "libtls.so.28") + + +def side_config(name: str) -> dict[str, Any]: + if name == "ours-standard": + return {"rpKind": "ours", "mode": "standard", "protocol": "rrdp+rsync", "rsyncScope": "module-root"} + if name == "rpki-client-standard": + return {"rpKind": "rpki-client", "mode": "standard", "protocol": "rrdp+rsync"} + raise SystemExit(f"unknown side config: {name}") + + +def build_remote_command(remote_root: Path, side_name: str, side: dict[str, Any], side_label: str, seq: int, rirs: list[str]) -> tuple[Path, str]: + run_dir = remote_root / "experiments" / "sequence" / side_label / f"run_{seq:04d}" + state_dir = remote_root / "experiments" / "sequence" / side_label / "state" / side["rpKind"] + sync_mode = "snapshot" if seq == 1 else "delta" + ensure = [f"mkdir -p {shlex.quote(str(run_dir))}", f"chmod 0777 {shlex.quote(str(run_dir))}"] + if side["rpKind"] == "ours": + if seq == 1: + ensure.append(f"rm -rf {shlex.quote(str(state_dir))}") + ensure.extend([ + f"mkdir -p {shlex.quote(str(state_dir / 'work-db'))} {shlex.quote(str(state_dir / 'rsync-mirror'))}", + f"chmod -R 0777 {shlex.quote(str(state_dir.parent))}", + ]) + argv = [ + str(remote_root / "bin" / "rpki"), + "--db", str(state_dir / "work-db"), + "--raw-store-db", str(state_dir / "raw-store.db"), + "--repo-bytes-db", str(state_dir / "repo-bytes.db"), + "--rsync-scope", side.get("rsyncScope", "module-root"), + ] + for rir in rirs: + argv.extend(["--tal-path", str(remote_root / "fixtures" / "tal" / fixture_name(rir, "tal"))]) + argv.extend(["--ta-path", str(remote_root / "fixtures" / "ta" / fixture_name(rir, "ta"))]) + argv.extend(["--report-json", str(run_dir / "report.json"), "--report-json-compact"]) + argv.extend(["--ccr-out", str(run_dir / "result.ccr"), "--cir-enable", "--cir-out", str(run_dir / "result.cir")]) + for rir in rirs: + argv.extend(["--cir-tal-uri", cir_tal_uri_for_rir(rir)]) + argv.extend(["--vrps-csv-out", str(run_dir / "vrps.csv"), "--vaps-csv-out", str(run_dir / "vaps.csv")]) + prefix = "env RPKI_PROGRESS_LOG=1 RPKI_PROGRESS_SLOW_SECS=10 /usr/bin/time" + else: + if seq == 1: + ensure.append(f"rm -rf {shlex.quote(str(state_dir))}") + ensure.extend([ + f"mkdir -p {shlex.quote(str(state_dir / 'cache' / 'fixtures'))}", + f"touch {shlex.quote(str(state_dir / 'rpki-client-skiplist'))}", + f"chmod -R 0777 {shlex.quote(str(state_dir.parent))}", + ]) + for rir in rirs: + ensure.append( + f"cp -f {shlex.quote(str(remote_root / 'fixtures' / 'ta' / fixture_name(rir, 'ta')))} " + f"{shlex.quote(str(state_dir / 'cache' / 'fixtures' / fixture_name(rir, 'ta')))}" + ) + argv = [str(remote_root / "bin" / "rpki-client"), "-vv", "-S", str(state_dir / "rpki-client-skiplist")] + for rir in rirs: + argv.extend(["-t", str(remote_root / "fixtures" / "tal" / fixture_name(rir, "tal"))]) + argv.extend(["-T", f"{fixture_desc(rir)}:{state_dir / 'cache' / 'fixtures' / fixture_name(rir, 'ta')}"]) + argv.extend(["-d", str(state_dir / "cache"), str(run_dir)]) + prefix = f"env LD_LIBRARY_PATH={shlex.quote(str(remote_root / 'lib'))} /usr/bin/time" + command = ( + "set -euo pipefail; " + + "; ".join(ensure) + + "; date -u +%Y-%m-%dT%H:%M:%SZ > " + shlex.quote(str(run_dir / "started-at.txt")) + + "; set +e; " + + prefix + " -v -o " + shlex.quote(str(run_dir / "process-time.txt")) + + " -- " + shlex.join(argv) + + " > " + shlex.quote(str(run_dir / "stdout.log")) + + " 2> " + shlex.quote(str(run_dir / "stderr.log")) + + "; ec=$?; set -e; printf '%s\n' \"$ec\" > " + shlex.quote(str(run_dir / "exit-code.txt")) + + "; date -u +%Y-%m-%dT%H:%M:%SZ > " + shlex.quote(str(run_dir / "finished-at.txt")) + + "; true" + ) + if side["rpKind"] == "rpki-client": + command += ( + f"; [ -f {shlex.quote(str(run_dir / 'json'))} ] && cp -f {shlex.quote(str(run_dir / 'json'))} {shlex.quote(str(run_dir / 'report.json'))} || true" + f"; [ -f {shlex.quote(str(run_dir / 'rpki.ccr'))} ] && cp -f {shlex.quote(str(run_dir / 'rpki.ccr'))} {shlex.quote(str(run_dir / 'result.ccr'))} || true" + f"; [ -f {shlex.quote(str(run_dir / 'rpki.cir'))} ] && cp -f {shlex.quote(str(run_dir / 'rpki.cir'))} {shlex.quote(str(run_dir / 'result.cir'))} || true" + ) + command += ( + f"; python3 - <<'REMOTE_META' {shlex.quote(str(run_dir))} {shlex.quote(side_name)} {shlex.quote(side_label)} {seq} {shlex.quote(sync_mode)}\n" + "import json, pathlib, sys\n" + "run_dir=pathlib.Path(sys.argv[1]); side_name=sys.argv[2]; side_label=sys.argv[3]; seq=int(sys.argv[4]); sync_mode=sys.argv[5]\n" + "def read(p):\n return p.read_text().strip() if p.exists() else None\n" + "meta={'sideName':side_name,'sideLabel':side_label,'seq':seq,'syncMode':sync_mode,'startedAt':read(run_dir/'started-at.txt'),'finishedAt':read(run_dir/'finished-at.txt'),'exitCode':int(read(run_dir/'exit-code.txt') or '1')}\n" + "json.dump(meta, open(run_dir/'remote-run-meta.json','w'), indent=2, sort_keys=True); print()\n" + "REMOTE_META" + ) + return run_dir, command + + +def run_remote_sample(ssh_target: str, remote_root: Path, side_name: str, side: dict[str, Any], side_label: str, seq: int, rirs: list[str]) -> Path: + run_dir, command = build_remote_command(remote_root, side_name, side, side_label, seq, rirs) + ssh_script(ssh_target, command) + return run_dir + + +def cir_counts(cir_path: Path) -> dict[str, int]: + result = run_local([str(REPO_ROOT / "target" / "release" / "cir_dump_reject_list"), "--cir", str(cir_path), "--limit", "0"], capture=True) + values: dict[str, int] = {} + for line in result.stdout.splitlines(): + if "=" not in line: + continue + key, value = line.split("=", 1) + if key in {"object_count", "trust_anchor_count", "reject_count"}: + values[key] = int(value) + return { + "cirObjectCount": values.get("object_count", 0), + "cirTrustAnchorCount": values.get("trust_anchor_count", 0), + "cirRejectCount": values.get("reject_count", 0), + } + + +def report_counts(path: Path, rp_kind: str) -> dict[str, int]: + if not path.is_file(): + return {} + report = load_json(path) + if rp_kind == "rpki-client": + meta = report.get("metadata", {}) + return { + "vrps": int(meta.get("vrps", 0)), + "vaps": int(meta.get("vaps", 0) or meta.get("aspas", 0) or 0), + "publicationPoints": int(meta.get("repositories", 0)), + "warnings": 0, + } + pps = report.get("publication_points", []) + tree = report.get("tree", {}) + return { + "vrps": len(report.get("vrps", [])), + "vaps": len(report.get("aspas", [])), + "publicationPoints": len(pps), + "warnings": len(tree.get("warnings", [])) + sum(len(pp.get("warnings", [])) for pp in pps if isinstance(pp, dict)), + } + + +def build_sequence_item(local_root: Path, side_name: str, side_label: str, side: dict[str, Any], seq: int, run_dir: Path) -> dict[str, Any]: + ccr = run_dir / "result.ccr" + cir = run_dir / "result.cir" + meta = load_json(run_dir / "remote-run-meta.json") + time_info = parse_time_file(run_dir / "process-time.txt") + counts = report_counts(run_dir / "report.json", side["rpKind"]) + counts.update(cir_counts(cir)) + return { + "schemaVersion": 1, + "rpId": side_name, + "side": "left" if side_label == "A" else "right", + "seq": seq, + "runId": f"{side_label}-{seq:04d}", + "syncMode": "snapshot" if seq == 1 else "delta", + "status": "success" if meta.get("exitCode") == 0 else "failed", + "startTime": meta.get("startedAt"), + "finishTime": meta.get("finishedAt"), + "validationTime": None, + "ccrPath": ccr.relative_to(local_root).as_posix(), + "cirPath": cir.relative_to(local_root).as_posix(), + "ccrSha256": sha256_file(ccr), + "cirSha256": sha256_file(cir), + "wallMs": time_info.get("wallMs"), + "maxRssKb": time_info.get("maxRssKb"), + "vrps": counts.get("vrps"), + "vaps": counts.get("vaps"), + "publicationPoints": counts.get("publicationPoints"), + "cirObjectCount": counts.get("cirObjectCount"), + "cirRejectCount": counts.get("cirRejectCount"), + "cirTrustAnchorCount": counts.get("cirTrustAnchorCount"), + } + + +def run_sequence_triage(local_exp_root: Path, args: argparse.Namespace) -> None: + compare_dir = local_exp_root / "sequence-triage" + run_local([ + str(REPO_ROOT / "target" / "release" / "sequence_triage_ccr_cir"), + "--left-sequence", str(local_exp_root / "left-sequence.jsonl"), + "--right-sequence", str(local_exp_root / "right-sequence.jsonl"), + "--out-dir", str(compare_dir), + "--align-window-runs", str(args.align_window_runs), + "--align-window-secs", str(args.align_window_secs), + "--sample-limit", str(args.sample_limit), + "--timeline-sample-limit", str(args.timeline_sample_limit), + ]) + + +def run_experiment(args: argparse.Namespace) -> None: + if not args.skip_build: + build_tool_binaries() + rirs = [item.strip() for item in args.rirs.split(",") if item.strip()] + if set(rirs) != set(DEFAULT_RIRS) or len(rirs) != len(DEFAULT_RIRS): + raise SystemExit("#043 remote acceptance requires all5 RIRs: afrinic,apnic,arin,lacnic,ripe") + left = side_config(args.left) + right = side_config(args.right) + run_root = Path(args.run_root).resolve() + remote_root = Path(args.remote_root) + run_root.mkdir(parents=True, exist_ok=True) + write_json(run_root / "experiment-config.json", { + "schemaVersion": 1, + "generatedAtUtc": utc_stamp(), + "left": args.left, + "right": args.right, + "samplesPerSide": args.samples_per_side, + "rirs": rirs, + "remoteRoot": str(remote_root), + "sshTarget": args.ssh_target, + }) + if args.dry_run: + print(json.dumps(load_json(run_root / "experiment-config.json"), indent=2, ensure_ascii=False)) + return + if args.triage_only: + run_sequence_triage(run_root / "experiments" / "sequence", args) + print(json.dumps({ + "runRoot": str(run_root), + "triage": str(run_root / "experiments" / "sequence" / "sequence-triage" / "sequence-triage.json"), + }, indent=2)) + return + prepare_remote(args.ssh_target, remote_root, needs_rpki_client=(left["rpKind"] == "rpki-client" or right["rpKind"] == "rpki-client")) + local_exp_root = run_root / "experiments" / "sequence" + left_seq_path = local_exp_root / "left-sequence.jsonl" + right_seq_path = local_exp_root / "right-sequence.jsonl" + left_seq_path.unlink(missing_ok=True) + right_seq_path.unlink(missing_ok=True) + progress: list[dict[str, Any]] = [] + for seq in range(1, args.samples_per_side + 1): + for side_label, side_name, side, seq_path in [("A", args.left, left, left_seq_path), ("B", args.right, right, right_seq_path)]: + print(f"[run] {side_label} {side_name} seq={seq} all5", flush=True) + remote_run_dir = run_remote_sample(args.ssh_target, remote_root, side_name, side, side_label, seq, rirs) + local_run_dir = local_exp_root / side_label / f"run_{seq:04d}" + rsync_run_artifacts_from_remote(args.ssh_target, remote_run_dir, local_run_dir) + item = build_sequence_item(local_exp_root, side_name, side_label, side, seq, local_run_dir) + append_jsonl(seq_path, item) + progress.append(item) + print( + f"[done] {side_label} seq={seq} wallMs={item.get('wallMs')} vrps={item.get('vrps')} vaps={item.get('vaps')} objects={item.get('cirObjectCount')} rejects={item.get('cirRejectCount')}", + flush=True, + ) + write_json(local_exp_root / "run-progress.json", progress) + run_sequence_triage(local_exp_root, args) + ssh_script(args.ssh_target, f"df -h /data / > {shlex.quote(str(remote_root / 'df-after.txt'))} 2>&1 || true; free -h > {shlex.quote(str(remote_root / 'free-after.txt'))} 2>&1 || true") + compare_dir = local_exp_root / "sequence-triage" + print(json.dumps({"runRoot": str(run_root), "remoteRoot": str(remote_root), "triage": str(compare_dir / "sequence-triage.json")}, indent=2)) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Feature #043 all5 sequence triage experiment driver") + parser.add_argument("--run-root", required=True) + parser.add_argument("--remote-root", required=True) + parser.add_argument("--ssh-target", default=os.environ.get("SSH_TARGET", "root@47.251.56.108")) + parser.add_argument("--left", default="ours-standard") + parser.add_argument("--right", default="rpki-client-standard") + parser.add_argument("--samples-per-side", type=int, default=3) + parser.add_argument("--rirs", default=",".join(DEFAULT_RIRS)) + parser.add_argument("--align-window-runs", type=int, default=2) + parser.add_argument("--align-window-secs", type=int, default=1800) + parser.add_argument("--sample-limit", type=int, default=200) + parser.add_argument("--timeline-sample-limit", type=int, default=0) + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--skip-build", action="store_true", help="reuse existing release binaries") + parser.add_argument("--triage-only", action="store_true", help="only rerun local sequence triage for an existing run root") + args = parser.parse_args() + if args.samples_per_side < 2: + raise SystemExit("--samples-per-side must be >= 2") + run_experiment(args) + + +if __name__ == "__main__": + main() diff --git a/src/bin/sequence_triage_ccr_cir.rs b/src/bin/sequence_triage_ccr_cir.rs new file mode 100644 index 0000000..7b76a99 --- /dev/null +++ b/src/bin/sequence_triage_ccr_cir.rs @@ -0,0 +1,2714 @@ +use std::collections::{BTreeMap, BTreeSet}; +use std::path::{Path, PathBuf}; + +use rpki::ccr::{decode_ccr_compare_views, decode_content_info}; +use rpki::cir::decode_cir; +use serde::Deserialize; +use serde_json::{Value, json}; +use time::OffsetDateTime; +use time::format_description::well_known::Rfc3339; + +#[derive(Debug, PartialEq, Eq)] +struct Args { + left_sequence: PathBuf, + right_sequence: PathBuf, + out_dir: PathBuf, + align_window_runs: u32, + align_window_secs: i64, + sample_limit: usize, + warmup_samples: usize, + cooldown_samples: usize, + timeline_sample_limit: usize, +} + +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct SequenceItemRaw { + schema_version: Option, + rp_id: String, + side: Option, + seq: u32, + run_id: String, + sync_mode: Option, + status: Option, + start_time: Option, + finish_time: Option, + validation_time: Option, + ccr_path: PathBuf, + cir_path: PathBuf, + ccr_sha256: Option, + cir_sha256: Option, + wall_ms: Option, + max_rss_kb: Option, + vrps: Option, + vaps: Option, +} + +#[derive(Clone, Debug)] +struct SequenceSample { + raw: SequenceItemRaw, + validation_time: OffsetDateTime, + ccr_path: PathBuf, + cir_path: PathBuf, + objects: BTreeMap, + object_uris: BTreeSet, + object_hashes: BTreeSet, + rejects: BTreeSet, + trust_anchors: BTreeSet, + vrps: BTreeSet, + vaps: BTreeSet, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum Side { + Left, + Right, +} + +impl Side { + fn as_str(self) -> &'static str { + match self { + Side::Left => "left", + Side::Right => "right", + } + } +} + +#[derive(Clone, Debug)] +struct EventOccurrence { + side: Side, + seq: u32, + run_id: String, +} + +#[derive(Clone, Debug)] +struct SampleRecord { + classification: &'static str, + event_type: &'static str, + key: String, + source_side: Side, + source_seq: u32, + source_run_id: String, + matched_seq: Option, + matched_run_id: Option, + note: String, +} + +#[derive(Clone, Debug, Default)] +struct ClassStats { + total: usize, + samples: Vec, +} + +#[derive(Clone, Debug, Default)] +struct AnalysisResult { + stats: BTreeMap<&'static str, ClassStats>, +} + +#[derive(Clone, Debug)] +struct DiffEvent { + event_type: &'static str, + raw_class: &'static str, + key: String, + source_side: Side, + source_seq: u32, + source_run_id: String, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum EdgePosition { + Leading, + Stable, + Trailing, +} + +#[derive(Clone, Debug)] +struct AdjustedRecord { + classification: &'static str, + event_type: &'static str, + key: String, + source_side: Side, + source_seq: u32, + source_run_id: String, + note: String, +} + +#[derive(Clone, Debug, Default)] +struct AdjustedClassStats { + total: usize, + unique_keys: BTreeSet, + samples: Vec, +} + +#[derive(Clone, Debug, Default)] +struct AdjustedAnalysis { + raw_persistent_occurrences: usize, + raw_persistent_unique_keys: usize, + edge_filtered_occurrences: usize, + edge_filtered_unique_keys: usize, + adjusted_stable_occurrences: usize, + adjusted_stable_unique_keys: usize, + stats: BTreeMap<&'static str, AdjustedClassStats>, + uri_timeline_samples: Vec, + stable_object_groups: Vec, +} + +#[derive(Clone, Debug)] +struct SandwichRecord { + classification: &'static str, + set_type: &'static str, + key: String, + source_side: Side, + source_start_seq: u32, + source_start_run_id: String, + source_end_seq: u32, + source_end_run_id: String, + peer_seq: u32, + peer_run_id: String, + source_value: Option, + peer_value: Option, + source_start_time: String, + peer_time: String, + source_end_time: String, + note: String, +} + +#[derive(Clone, Debug, Default)] +struct SandwichClassStats { + total: usize, + unique_keys: BTreeSet, + samples: Vec, +} + +#[derive(Clone, Debug, Default)] +struct SandwichAnalysis { + total_occurrences: usize, + unique_keys: BTreeSet, + by_set_type: BTreeMap<&'static str, usize>, + stats: BTreeMap<&'static str, SandwichClassStats>, +} + +fn usage() -> &'static str { + "Usage: sequence_triage_ccr_cir --left-sequence --right-sequence --out-dir [--align-window-runs ] [--align-window-secs ] [--sample-limit ] [--warmup-samples ] [--cooldown-samples ] [--timeline-sample-limit ]" +} + +fn main() { + if let Err(err) = real_main() { + eprintln!("{err}"); + std::process::exit(1); + } +} + +fn real_main() -> Result<(), String> { + let args = parse_args(&std::env::args().collect::>())?; + run(args) +} + +fn parse_args(argv: &[String]) -> Result { + let mut left_sequence = None; + let mut right_sequence = None; + let mut out_dir = None; + let mut align_window_runs = 2u32; + let mut align_window_secs = 1800i64; + let mut sample_limit = 200usize; + let mut warmup_samples = 1usize; + let mut cooldown_samples = 1usize; + let mut timeline_sample_limit = 0usize; + let mut index = 1usize; + while index < argv.len() { + match argv[index].as_str() { + "--left-sequence" => { + index += 1; + left_sequence = Some(PathBuf::from( + argv.get(index).ok_or("--left-sequence requires a value")?, + )); + } + "--right-sequence" => { + index += 1; + right_sequence = Some(PathBuf::from( + argv.get(index).ok_or("--right-sequence requires a value")?, + )); + } + "--out-dir" => { + index += 1; + out_dir = Some(PathBuf::from( + argv.get(index).ok_or("--out-dir requires a value")?, + )); + } + "--align-window-runs" => { + index += 1; + let value = argv + .get(index) + .ok_or("--align-window-runs requires a value")?; + align_window_runs = value + .parse::() + .map_err(|_| format!("invalid --align-window-runs: {value}"))?; + } + "--align-window-secs" => { + index += 1; + let value = argv + .get(index) + .ok_or("--align-window-secs requires a value")?; + align_window_secs = value + .parse::() + .map_err(|_| format!("invalid --align-window-secs: {value}"))?; + } + "--sample-limit" => { + index += 1; + let value = argv.get(index).ok_or("--sample-limit requires a value")?; + sample_limit = value + .parse::() + .map_err(|_| format!("invalid --sample-limit: {value}"))?; + } + "--warmup-samples" => { + index += 1; + let value = argv.get(index).ok_or("--warmup-samples requires a value")?; + warmup_samples = value + .parse::() + .map_err(|_| format!("invalid --warmup-samples: {value}"))?; + } + "--cooldown-samples" => { + index += 1; + let value = argv + .get(index) + .ok_or("--cooldown-samples requires a value")?; + cooldown_samples = value + .parse::() + .map_err(|_| format!("invalid --cooldown-samples: {value}"))?; + } + "--timeline-sample-limit" => { + index += 1; + let value = argv + .get(index) + .ok_or("--timeline-sample-limit requires a value")?; + timeline_sample_limit = value + .parse::() + .map_err(|_| format!("invalid --timeline-sample-limit: {value}"))?; + } + "-h" | "--help" => return Err(usage().to_string()), + other => return Err(format!("unknown argument: {other}\n{}", usage())), + } + index += 1; + } + Ok(Args { + left_sequence: left_sequence + .ok_or_else(|| format!("--left-sequence is required\n{}", usage()))?, + right_sequence: right_sequence + .ok_or_else(|| format!("--right-sequence is required\n{}", usage()))?, + out_dir: out_dir.ok_or_else(|| format!("--out-dir is required\n{}", usage()))?, + align_window_runs, + align_window_secs, + sample_limit, + warmup_samples, + cooldown_samples, + timeline_sample_limit, + }) +} + +fn run(args: Args) -> Result<(), String> { + std::fs::create_dir_all(&args.out_dir) + .map_err(|e| format!("create out-dir failed: {}: {e}", args.out_dir.display()))?; + let left = load_sequence(&args.left_sequence, Side::Left)?; + let right = load_sequence(&args.right_sequence, Side::Right)?; + if left.is_empty() || right.is_empty() { + return Err("left and right sequences must both contain at least one sample".into()); + } + + let mut result = AnalysisResult::default(); + analyze_set( + &mut result, + "object_uri", + &left, + &right, + |sample| &sample.object_uris, + "TEMPORAL_LAG_RESOLVED", + "PERSISTENT_OBJECT_SET_DIVERGENCE", + &args, + ); + analyze_set( + &mut result, + "object_hash", + &left, + &right, + |sample| &sample.object_hashes, + "TEMPORAL_LAG_RESOLVED", + "PERSISTENT_CONTENT_DIVERGENCE", + &args, + ); + analyze_hash_rollover(&mut result, &left, &right, &args); + analyze_set( + &mut result, + "reject_uri", + &left, + &right, + |sample| &sample.rejects, + "TEMPORAL_LAG_RESOLVED", + "PERSISTENT_REJECT_DIVERGENCE", + &args, + ); + analyze_set( + &mut result, + "trust_anchor", + &left, + &right, + |sample| &sample.trust_anchors, + "TEMPORAL_LAG_RESOLVED", + "PERSISTENT_TA_DIFFERENCE", + &args, + ); + analyze_set( + &mut result, + "vrp_output", + &left, + &right, + |sample| &sample.vrps, + "TEMPORAL_LAG_RESOLVED", + "PERSISTENT_OUTPUT_DIVERGENCE", + &args, + ); + analyze_set( + &mut result, + "vap_output", + &left, + &right, + |sample| &sample.vaps, + "TEMPORAL_LAG_RESOLVED", + "PERSISTENT_OUTPUT_DIVERGENCE", + &args, + ); + + let persistent_events = collect_persistent_events(&left, &right, &args); + let adjusted = build_adjusted_analysis(&args, &left, &right, &persistent_events); + let sandwich = build_sandwich_analysis(&args, &left, &right); + let output = build_output(&args, &left, &right, &result, &adjusted, &sandwich); + write_json(&args.out_dir.join("sequence-triage.json"), &output)?; + write_markdown(&args.out_dir.join("sequence-triage.md"), &output)?; + write_samples_jsonl(&args.out_dir.join("sequence-diff-samples.jsonl"), &result)?; + println!("{}", args.out_dir.display()); + Ok(()) +} + +fn load_sequence(path: &Path, side: Side) -> Result, String> { + let base_dir = path.parent().unwrap_or_else(|| Path::new(".")); + let text = std::fs::read_to_string(path) + .map_err(|e| format!("read sequence failed: {}: {e}", path.display()))?; + let mut samples = Vec::new(); + let mut seen_seq = BTreeSet::new(); + for (line_index, line) in text.lines().enumerate() { + let line = line.trim(); + if line.is_empty() { + continue; + } + let raw: SequenceItemRaw = serde_json::from_str(line).map_err(|e| { + format!( + "parse sequence JSONL failed: {}:{}: {e}", + path.display(), + line_index + 1 + ) + })?; + if raw.schema_version.unwrap_or(1) != 1 { + return Err(format!( + "unsupported sequence item schemaVersion in {}:{}", + path.display(), + line_index + 1 + )); + } + if !seen_seq.insert(raw.seq) { + return Err(format!("duplicate seq {} in {}", raw.seq, path.display())); + } + if let Some(status) = &raw.status + && status != "success" + { + return Err(format!( + "sequence sample {} has non-success status: {status}", + raw.run_id + )); + } + let cir_path = resolve_path(base_dir, &raw.cir_path); + let ccr_path = resolve_path(base_dir, &raw.ccr_path); + let cir = decode_cir(&read_file(&cir_path)?).map_err(|e| { + format!( + "decode CIR failed for sample {} ({}): {e}", + raw.run_id, + cir_path.display() + ) + })?; + let ccr = decode_content_info(&read_file(&ccr_path)?).map_err(|e| { + format!( + "decode CCR failed for sample {} ({}): {e}", + raw.run_id, + ccr_path.display() + ) + })?; + let validation_time = raw + .validation_time + .as_deref() + .map(parse_rfc3339) + .transpose()? + .unwrap_or(cir.validation_time); + let objects = cir + .objects + .iter() + .map(|item| (item.rsync_uri.clone(), hex::encode(&item.sha256))) + .collect::>(); + let object_uris = objects.keys().cloned().collect::>(); + let object_hashes = objects + .iter() + .map(|(uri, hash)| object_hash_key(uri, hash)) + .collect::>(); + let rejects = cir + .rejected_objects + .iter() + .map(|item| item.object_uri.clone()) + .collect::>(); + let trust_anchors = cir + .trust_anchors + .iter() + .map(|item| { + format!( + "{}|{}|{}|{}", + item.ta_rsync_uri, + item.tal_uri, + hex::encode(rpki::cir::sha256(&item.tal_bytes)), + hex::encode(&item.ta_certificate_sha256) + ) + }) + .collect::>(); + let (vrps, vaps) = decode_ccr_compare_views(&ccr).map_err(|e| { + format!( + "decode CCR compare views failed for sample {} ({}): {e}", + raw.run_id, + ccr_path.display() + ) + })?; + let vrps = vrps + .into_iter() + .map(|row| format!("{}|{}|{}", row.asn, row.ip_prefix, row.max_length)) + .collect::>(); + let vaps = vaps + .into_iter() + .map(|row| format!("{}|{}", row.customer_asn, row.providers)) + .collect::>(); + samples.push(SequenceSample { + raw, + validation_time, + ccr_path, + cir_path, + objects, + object_uris, + object_hashes, + rejects, + trust_anchors, + vrps, + vaps, + }); + } + samples.sort_by_key(|sample| sample.raw.seq); + for pair in samples.windows(2) { + if pair[0].raw.seq >= pair[1].raw.seq { + return Err("sequence must be sorted by increasing seq".into()); + } + } + if samples.iter().any(|sample| { + sample + .raw + .side + .as_deref() + .is_some_and(|item| item != side.as_str()) + }) { + return Err(format!( + "sequence side field does not match expected side: {}", + side.as_str() + )); + } + Ok(samples) +} + +fn analyze_set( + result: &mut AnalysisResult, + event_type: &'static str, + left: &[SequenceSample], + right: &[SequenceSample], + extract: F, + resolved_class: &'static str, + persistent_class: &'static str, + args: &Args, +) where + F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, +{ + analyze_direction( + result, + event_type, + Side::Left, + left, + right, + &extract, + resolved_class, + persistent_class, + args, + ); + analyze_direction( + result, + event_type, + Side::Right, + right, + left, + &extract, + resolved_class, + persistent_class, + args, + ); +} + +fn analyze_direction( + result: &mut AnalysisResult, + event_type: &'static str, + source_side: Side, + source: &[SequenceSample], + peer: &[SequenceSample], + extract: &F, + resolved_class: &'static str, + persistent_class: &'static str, + args: &Args, +) where + F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, +{ + for sample in source { + let source_set = extract(sample); + for key in source_set { + if peer_sample_at_seq(peer, sample.raw.seq) + .is_some_and(|peer_sample| extract(peer_sample).contains(key)) + { + continue; + } + if let Some(matched) = find_future_match(peer, sample, &key, extract, args) { + result.add( + resolved_class, + SampleRecord { + classification: resolved_class, + event_type, + key: key.clone(), + source_side, + source_seq: sample.raw.seq, + source_run_id: sample.raw.run_id.clone(), + matched_seq: Some(matched.seq), + matched_run_id: Some(matched.run_id), + note: format!( + "matched in {} sequence within alignment window", + matched.side.as_str() + ), + }, + args.sample_limit, + ); + } else { + result.add( + persistent_class, + SampleRecord { + classification: persistent_class, + event_type, + key: key.clone(), + source_side, + source_seq: sample.raw.seq, + source_run_id: sample.raw.run_id.clone(), + matched_seq: None, + matched_run_id: None, + note: "no matching event in peer sequence alignment window".to_string(), + }, + args.sample_limit, + ); + } + } + } +} + +fn analyze_hash_rollover( + result: &mut AnalysisResult, + left: &[SequenceSample], + right: &[SequenceSample], + args: &Args, +) { + for (source_side, source, peer) in [(Side::Left, left, right), (Side::Right, right, left)] { + for sample in source { + for (uri, hash) in &sample.objects { + if peer_sample_at_seq(peer, sample.raw.seq) + .and_then(|peer_sample| peer_sample.objects.get(uri)) + .is_some_and(|peer_hash| peer_hash == hash) + { + continue; + } + if let Some(peer_sample) = peer_sample_at_seq(peer, sample.raw.seq) + && peer_sample.objects.contains_key(uri) + && find_future_hash_match(peer, sample, uri, hash, args).is_some() + { + let matched = + find_future_hash_match(peer, sample, uri, hash, args).expect("match"); + result.add( + "CONTENT_ROLLOVER_RESOLVED", + SampleRecord { + classification: "CONTENT_ROLLOVER_RESOLVED", + event_type: "object_content_rollover", + key: object_hash_key(uri, hash), + source_side, + source_seq: sample.raw.seq, + source_run_id: sample.raw.run_id.clone(), + matched_seq: Some(matched.seq), + matched_run_id: Some(matched.run_id), + note: "same URI hash appeared in peer sequence later".to_string(), + }, + args.sample_limit, + ); + } + } + } + } +} + +fn collect_persistent_events( + left: &[SequenceSample], + right: &[SequenceSample], + args: &Args, +) -> Vec { + let mut events = Vec::new(); + collect_persistent_set( + &mut events, + "object_uri", + "PERSISTENT_OBJECT_SET_DIVERGENCE", + left, + right, + |sample| &sample.object_uris, + args, + ); + collect_persistent_set( + &mut events, + "object_hash", + "PERSISTENT_CONTENT_DIVERGENCE", + left, + right, + |sample| &sample.object_hashes, + args, + ); + collect_persistent_set( + &mut events, + "reject_uri", + "PERSISTENT_REJECT_DIVERGENCE", + left, + right, + |sample| &sample.rejects, + args, + ); + collect_persistent_set( + &mut events, + "trust_anchor", + "PERSISTENT_TA_DIFFERENCE", + left, + right, + |sample| &sample.trust_anchors, + args, + ); + collect_persistent_set( + &mut events, + "vrp_output", + "PERSISTENT_OUTPUT_DIVERGENCE", + left, + right, + |sample| &sample.vrps, + args, + ); + collect_persistent_set( + &mut events, + "vap_output", + "PERSISTENT_OUTPUT_DIVERGENCE", + left, + right, + |sample| &sample.vaps, + args, + ); + events +} + +fn collect_persistent_set( + events: &mut Vec, + event_type: &'static str, + raw_class: &'static str, + left: &[SequenceSample], + right: &[SequenceSample], + extract: F, + args: &Args, +) where + F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, +{ + collect_persistent_direction( + events, + event_type, + raw_class, + Side::Left, + left, + right, + &extract, + args, + ); + collect_persistent_direction( + events, + event_type, + raw_class, + Side::Right, + right, + left, + &extract, + args, + ); +} + +fn collect_persistent_direction( + events: &mut Vec, + event_type: &'static str, + raw_class: &'static str, + source_side: Side, + source: &[SequenceSample], + peer: &[SequenceSample], + extract: &F, + args: &Args, +) where + F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, +{ + for sample in source { + for key in extract(sample) { + if peer_sample_at_seq(peer, sample.raw.seq) + .is_some_and(|peer_sample| extract(peer_sample).contains(key)) + { + continue; + } + if find_future_match(peer, sample, key, extract, args).is_some() { + continue; + } + events.push(DiffEvent { + event_type, + raw_class, + key: key.clone(), + source_side, + source_seq: sample.raw.seq, + source_run_id: sample.raw.run_id.clone(), + }); + } + } +} + +fn build_adjusted_analysis( + args: &Args, + left: &[SequenceSample], + right: &[SequenceSample], + events: &[DiffEvent], +) -> AdjustedAnalysis { + let mut analysis = AdjustedAnalysis { + raw_persistent_occurrences: events.len(), + raw_persistent_unique_keys: unique_event_count(events), + ..Default::default() + }; + let mut edge_filtered_unique = BTreeSet::new(); + + for event in events { + let source_samples = samples_for_side(event.source_side, left, right); + let peer_samples = samples_for_side(opposite_side(event.source_side), left, right); + let edge = edge_position(source_samples, event.source_seq, args); + if edge == EdgePosition::Stable { + analysis.edge_filtered_occurrences += 1; + edge_filtered_unique.insert(event_identity(event)); + } + let (classification, note) = + adjusted_classification(event, edge, source_samples, peer_samples); + analysis.add( + classification, + AdjustedRecord { + classification, + event_type: event.event_type, + key: event.key.clone(), + source_side: event.source_side, + source_seq: event.source_seq, + source_run_id: event.source_run_id.clone(), + note, + }, + args.sample_limit, + ); + } + + analysis.edge_filtered_unique_keys = edge_filtered_unique.len(); + let mut stable_unique = BTreeSet::new(); + for (class, stats) in &analysis.stats { + if class.starts_with("STABLE_") { + analysis.adjusted_stable_occurrences += stats.total; + stable_unique.extend(stats.unique_keys.iter().cloned()); + } + } + analysis.adjusted_stable_unique_keys = stable_unique.len(); + let timeline_limit = if args.timeline_sample_limit == 0 { + args.sample_limit + } else { + args.timeline_sample_limit + }; + analysis.uri_timeline_samples = + build_uri_timeline_samples(left, right, &analysis, timeline_limit); + analysis.stable_object_groups = + build_stable_object_groups(&analysis, left, right, timeline_limit); + analysis +} + +fn adjusted_classification( + event: &DiffEvent, + edge: EdgePosition, + source: &[SequenceSample], + peer: &[SequenceSample], +) -> (&'static str, String) { + if event.event_type == "trust_anchor" { + if peer_has_same_ta_identity(peer, &event.key) { + return ( + "TA_PROJECTION_FORMAT_DIFFERENCE", + "same TAL hash and TA certificate hash exist on peer side; full TA projection string differs".to_string(), + ); + } + if edge == EdgePosition::Stable { + return ( + "STABLE_TA_DIVERGENCE", + "trust-anchor identity is not aligned in a non-boundary sample".to_string(), + ); + } + return edge_unresolved_class(edge); + } + + if event.event_type == "object_hash" { + if let Some((uri, hash)) = split_object_hash_key(&event.key) { + let peer_has_uri = peer_has_uri_any(peer, uri); + let peer_has_different_hash = peer_has_uri_different_hash_any(peer, uri, hash); + let peer_hash_at_source_seq = peer_sample_at_seq(peer, event.source_seq) + .and_then(|peer_sample| peer_sample.objects.get(uri)); + let source_later_matches_peer_hash = peer_hash_at_source_seq.is_some_and(|peer_hash| { + source_future_has_hash(source, event.source_seq, uri, peer_hash) + }); + if edge == EdgePosition::Leading && peer_has_different_hash { + return ( + "EDGE_LEADING_CONTENT_ROLLOVER", + "source leading-edge hash is absent, while peer already has the same URI with another hash".to_string(), + ); + } + if edge == EdgePosition::Trailing && peer_has_different_hash { + return ( + "EDGE_TRAILING_CONTENT_ROLLOVER", + "source trailing-edge hash is absent, while peer has the same URI with another hash and no later source observation exists".to_string(), + ); + } + if edge == EdgePosition::Stable && source_later_matches_peer_hash { + return ( + "MID_SEQUENCE_CONTENT_ROLLOVER_RESOLVED", + "peer already has a newer same-URI hash at this seq and source catches up later in the observed sequence".to_string(), + ); + } + if edge == EdgePosition::Stable && peer_has_different_hash { + return ( + "STABLE_CONTENT_DIVERGENCE", + "same URI exists on peer side with a different hash in a non-boundary sample" + .to_string(), + ); + } + if edge == EdgePosition::Stable && !peer_has_uri { + return ( + "STABLE_OBJECT_SET_DIVERGENCE", + "content key is derived from a non-boundary URI that is absent on peer side" + .to_string(), + ); + } + } + if edge == EdgePosition::Stable { + return ( + "STABLE_CONTENT_DIVERGENCE", + "object hash key remains unaligned in a non-boundary sample".to_string(), + ); + } + return edge_unresolved_class(edge); + } + + if edge != EdgePosition::Stable { + return edge_unresolved_class(edge); + } + + let stable_class = match event.raw_class { + "PERSISTENT_OBJECT_SET_DIVERGENCE" => "STABLE_OBJECT_SET_DIVERGENCE", + "PERSISTENT_REJECT_DIVERGENCE" => "STABLE_REJECT_DIVERGENCE", + "PERSISTENT_OUTPUT_DIVERGENCE" => "STABLE_OUTPUT_DIVERGENCE", + "PERSISTENT_CONTENT_DIVERGENCE" => "STABLE_CONTENT_DIVERGENCE", + "PERSISTENT_TA_DIFFERENCE" => "STABLE_TA_DIVERGENCE", + _ => "STABLE_UNCLASSIFIED_DIVERGENCE", + }; + ( + stable_class, + "persistent event appears in a non-boundary sample after sequence edge filtering" + .to_string(), + ) +} + +fn edge_unresolved_class(edge: EdgePosition) -> (&'static str, String) { + match edge { + EdgePosition::Leading => ( + "EDGE_LEADING_UNRESOLVED", + "event appears only from the warmup edge and may predate the observed sequence" + .to_string(), + ), + EdgePosition::Trailing => ( + "EDGE_TRAILING_UNRESOLVED", + "event appears at the cooldown edge and lacks later peer observations".to_string(), + ), + EdgePosition::Stable => ( + "STABLE_UNCLASSIFIED_DIVERGENCE", + "event is not on an edge but no specific stable category matched".to_string(), + ), + } +} + +impl AdjustedAnalysis { + fn add(&mut self, class: &'static str, record: AdjustedRecord, sample_limit: usize) { + let stats = self.stats.entry(class).or_default(); + stats.total += 1; + stats.unique_keys.insert(format!( + "{}|{}|{}", + record.event_type, + record.source_side.as_str(), + record.key + )); + if stats.samples.len() < sample_limit { + stats.samples.push(record); + } + } +} + +fn build_sandwich_analysis( + args: &Args, + left: &[SequenceSample], + right: &[SequenceSample], +) -> SandwichAnalysis { + let mut analysis = SandwichAnalysis::default(); + analyze_sandwich_objects(&mut analysis, Side::Left, left, right, args); + analyze_sandwich_objects(&mut analysis, Side::Right, right, left, args); + analyze_sandwich_sets( + &mut analysis, + "reject_uri", + "PEER_MISSING_STABLE_REJECT", + Side::Left, + left, + right, + |sample| &sample.rejects, + args, + ); + analyze_sandwich_sets( + &mut analysis, + "reject_uri", + "PEER_MISSING_STABLE_REJECT", + Side::Right, + right, + left, + |sample| &sample.rejects, + args, + ); + analyze_sandwich_sets( + &mut analysis, + "vrp_output", + "PEER_MISSING_STABLE_OUTPUT", + Side::Left, + left, + right, + |sample| &sample.vrps, + args, + ); + analyze_sandwich_sets( + &mut analysis, + "vrp_output", + "PEER_MISSING_STABLE_OUTPUT", + Side::Right, + right, + left, + |sample| &sample.vrps, + args, + ); + analyze_sandwich_sets( + &mut analysis, + "vap_output", + "PEER_MISSING_STABLE_OUTPUT", + Side::Left, + left, + right, + |sample| &sample.vaps, + args, + ); + analyze_sandwich_sets( + &mut analysis, + "vap_output", + "PEER_MISSING_STABLE_OUTPUT", + Side::Right, + right, + left, + |sample| &sample.vaps, + args, + ); + analysis +} + +fn analyze_sandwich_objects( + analysis: &mut SandwichAnalysis, + source_side: Side, + source: &[SequenceSample], + peer: &[SequenceSample], + args: &Args, +) { + for pair in source.windows(2) { + let source_start = &pair[0]; + let source_end = &pair[1]; + if source_start.validation_time >= source_end.validation_time { + continue; + } + let peers = peer_samples_between(peer, source_start, source_end); + if peers.is_empty() { + continue; + } + for (uri, source_hash) in &source_start.objects { + if source_end.objects.get(uri) != Some(source_hash) { + continue; + } + for peer_sample in &peers { + match peer_sample.objects.get(uri) { + Some(peer_hash) if peer_hash == source_hash => {} + Some(peer_hash) => analysis.add( + "PEER_HASH_MISMATCH_STABLE_OBJECT", + sandwich_record( + "PEER_HASH_MISMATCH_STABLE_OBJECT", + "object", + uri.clone(), + source_side, + source_start, + source_end, + peer_sample, + Some(source_hash.clone()), + Some(peer_hash.clone()), + "source interval has stable object hash; peer sample has same URI with another hash", + ), + args.sample_limit, + ), + None => analysis.add( + "PEER_MISSING_STABLE_OBJECT", + sandwich_record( + "PEER_MISSING_STABLE_OBJECT", + "object", + uri.clone(), + source_side, + source_start, + source_end, + peer_sample, + Some(source_hash.clone()), + None, + "source interval has stable object hash; peer sample misses the URI", + ), + args.sample_limit, + ), + } + } + } + } +} + +fn analyze_sandwich_sets( + analysis: &mut SandwichAnalysis, + set_type: &'static str, + classification: &'static str, + source_side: Side, + source: &[SequenceSample], + peer: &[SequenceSample], + extract: F, + args: &Args, +) where + F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, +{ + for pair in source.windows(2) { + let source_start = &pair[0]; + let source_end = &pair[1]; + if source_start.validation_time >= source_end.validation_time { + continue; + } + let peers = peer_samples_between(peer, source_start, source_end); + if peers.is_empty() { + continue; + } + let start_set = extract(source_start); + let end_set = extract(source_end); + for key in start_set { + if !end_set.contains(key) { + continue; + } + for peer_sample in &peers { + if extract(peer_sample).contains(key) { + continue; + } + analysis.add( + classification, + sandwich_record( + classification, + set_type, + key.clone(), + source_side, + source_start, + source_end, + peer_sample, + Some(key.clone()), + None, + "source interval has a stable key; peer sample misses the key", + ), + args.sample_limit, + ); + } + } + } +} + +fn peer_samples_between<'a>( + peer: &'a [SequenceSample], + source_start: &SequenceSample, + source_end: &SequenceSample, +) -> Vec<&'a SequenceSample> { + peer.iter() + .filter(|sample| { + source_start.validation_time < sample.validation_time + && sample.validation_time < source_end.validation_time + }) + .collect() +} + +#[allow(clippy::too_many_arguments)] +fn sandwich_record( + classification: &'static str, + set_type: &'static str, + key: String, + source_side: Side, + source_start: &SequenceSample, + source_end: &SequenceSample, + peer_sample: &SequenceSample, + source_value: Option, + peer_value: Option, + note: &str, +) -> SandwichRecord { + SandwichRecord { + classification, + set_type, + key, + source_side, + source_start_seq: source_start.raw.seq, + source_start_run_id: source_start.raw.run_id.clone(), + source_end_seq: source_end.raw.seq, + source_end_run_id: source_end.raw.run_id.clone(), + peer_seq: peer_sample.raw.seq, + peer_run_id: peer_sample.raw.run_id.clone(), + source_value, + peer_value, + source_start_time: format_time(source_start.validation_time), + peer_time: format_time(peer_sample.validation_time), + source_end_time: format_time(source_end.validation_time), + note: note.to_string(), + } +} + +impl SandwichAnalysis { + fn add(&mut self, class: &'static str, record: SandwichRecord, sample_limit: usize) { + self.total_occurrences += 1; + self.unique_keys.insert(sandwich_unique_key(&record)); + *self.by_set_type.entry(record.set_type).or_default() += 1; + let stats = self.stats.entry(class).or_default(); + stats.total += 1; + stats.unique_keys.insert(sandwich_unique_key(&record)); + if stats.samples.len() < sample_limit { + stats.samples.push(record); + } + } +} + +fn sandwich_unique_key(record: &SandwichRecord) -> String { + format!( + "{}|{}|{}|{}", + record.classification, + record.set_type, + record.source_side.as_str(), + record.key + ) +} + +fn unique_event_count(events: &[DiffEvent]) -> usize { + events + .iter() + .map(event_identity) + .collect::>() + .len() +} + +fn event_identity(event: &DiffEvent) -> String { + format!( + "{}|{}|{}", + event.event_type, + event.source_side.as_str(), + event.key + ) +} + +fn samples_for_side<'a>( + side: Side, + left: &'a [SequenceSample], + right: &'a [SequenceSample], +) -> &'a [SequenceSample] { + match side { + Side::Left => left, + Side::Right => right, + } +} + +fn opposite_side(side: Side) -> Side { + match side { + Side::Left => Side::Right, + Side::Right => Side::Left, + } +} + +fn edge_position(samples: &[SequenceSample], seq: u32, args: &Args) -> EdgePosition { + let Some(index) = samples.iter().position(|sample| sample.raw.seq == seq) else { + return EdgePosition::Stable; + }; + if index < args.warmup_samples { + return EdgePosition::Leading; + } + if samples.len().saturating_sub(index) <= args.cooldown_samples { + return EdgePosition::Trailing; + } + EdgePosition::Stable +} + +fn peer_has_uri_any(peer: &[SequenceSample], uri: &str) -> bool { + peer.iter().any(|sample| sample.objects.contains_key(uri)) +} + +fn peer_has_uri_different_hash_any(peer: &[SequenceSample], uri: &str, hash: &str) -> bool { + peer.iter() + .filter_map(|sample| sample.objects.get(uri)) + .any(|peer_hash| peer_hash != hash) +} + +fn source_future_has_hash( + source: &[SequenceSample], + seq: u32, + uri: &str, + expected_hash: &str, +) -> bool { + source + .iter() + .filter(|sample| sample.raw.seq > seq) + .any(|sample| { + sample + .objects + .get(uri) + .is_some_and(|hash| hash == expected_hash) + }) +} + +fn peer_has_same_ta_identity(peer: &[SequenceSample], key: &str) -> bool { + let Some(identity) = trust_anchor_identity(key) else { + return false; + }; + peer.iter().any(|sample| { + sample + .trust_anchors + .iter() + .filter_map(|peer_key| trust_anchor_identity(peer_key)) + .any(|peer_identity| peer_identity == identity) + }) +} + +fn trust_anchor_identity(key: &str) -> Option { + let parts = key.split('|').collect::>(); + if parts.len() != 4 { + return None; + } + Some(format!("{}|{}", parts[2], parts[3])) +} + +fn split_object_hash_key(key: &str) -> Option<(&str, &str)> { + key.rsplit_once('|') +} + +fn build_uri_timeline_samples( + left: &[SequenceSample], + right: &[SequenceSample], + adjusted: &AdjustedAnalysis, + limit: usize, +) -> Vec { + let mut uris = BTreeSet::new(); + for stats in adjusted.stats.values() { + for sample in &stats.samples { + if sample.event_type == "object_hash" + && let Some((uri, _)) = split_object_hash_key(&sample.key) + { + uris.insert(uri.to_string()); + } + if sample.event_type == "object_uri" { + uris.insert(sample.key.clone()); + } + } + } + uris.into_iter() + .take(limit) + .map(|uri| { + json!({ + "uri": uri, + "left": timeline_for_uri(left, &uri), + "right": timeline_for_uri(right, &uri), + }) + }) + .collect() +} + +fn timeline_for_uri(samples: &[SequenceSample], uri: &str) -> Vec { + samples + .iter() + .filter_map(|sample| { + sample.objects.get(uri).map(|hash| { + json!({ + "seq": sample.raw.seq, + "runId": sample.raw.run_id, + "validationTime": format_time(sample.validation_time), + "hash": hash, + }) + }) + }) + .collect() +} + +fn build_stable_object_groups( + adjusted: &AdjustedAnalysis, + left: &[SequenceSample], + right: &[SequenceSample], + limit: usize, +) -> Vec { + let mut groups: BTreeMap = BTreeMap::new(); + let Some(stats) = adjusted.stats.get("STABLE_OBJECT_SET_DIVERGENCE") else { + return Vec::new(); + }; + for sample in &stats.samples { + let physical_uri = physical_object_uri(sample); + let group_key = format!( + "{}|{}|{}|{}", + sample.source_side.as_str(), + sample.source_seq, + sample.source_run_id, + publication_point_prefix(&physical_uri) + ); + let group = groups + .entry(group_key) + .or_insert_with(|| StableObjectGroup::new(sample, &physical_uri)); + if group.source_cir_path.is_none() + && let Some(source_sample) = sample_by_side_seq_run( + left, + right, + sample.source_side, + sample.source_seq, + &sample.source_run_id, + ) + { + group.source_cir_path = Some(path_string(&source_sample.cir_path)); + } + group.add(sample, physical_uri); + } + groups + .into_values() + .take(limit) + .map(StableObjectGroup::to_json) + .collect() +} + +#[derive(Clone, Debug)] +struct StableObjectGroup { + source_side: Side, + source_seq: u32, + source_run_id: String, + source_cir_path: Option, + publication_point: String, + event_count: usize, + event_types: BTreeMap<&'static str, usize>, + physical_objects: BTreeMap, +} + +impl StableObjectGroup { + fn new(sample: &AdjustedRecord, physical_uri: &str) -> Self { + Self { + source_side: sample.source_side, + source_seq: sample.source_seq, + source_run_id: sample.source_run_id.clone(), + source_cir_path: None, + publication_point: publication_point_prefix(physical_uri), + event_count: 0, + event_types: BTreeMap::new(), + physical_objects: BTreeMap::new(), + } + } + + fn add(&mut self, sample: &AdjustedRecord, physical_uri: String) { + self.event_count += 1; + *self.event_types.entry(sample.event_type).or_default() += 1; + let object = self + .physical_objects + .entry(physical_uri.clone()) + .or_insert_with(|| StablePhysicalObject { + extension: object_extension(&physical_uri).to_string(), + uri: physical_uri, + event_types: BTreeSet::new(), + hashes: BTreeSet::new(), + }); + object.event_types.insert(sample.event_type); + if let Some(hash) = event_hash(sample) { + object.hashes.insert(hash.to_string()); + } + } + + fn to_json(self) -> Value { + json!({ + "sourceSide": self.source_side.as_str(), + "sourceSeq": self.source_seq, + "sourceRunId": self.source_run_id, + "sourceCirPath": self.source_cir_path, + "publicationPoint": self.publication_point, + "eventCount": self.event_count, + "eventTypes": self.event_types, + "physicalObjectCount": self.physical_objects.len(), + "physicalObjects": self.physical_objects.into_values().map(StablePhysicalObject::to_json).collect::>(), + }) + } +} + +#[derive(Clone, Debug)] +struct StablePhysicalObject { + uri: String, + extension: String, + event_types: BTreeSet<&'static str>, + hashes: BTreeSet, +} + +impl StablePhysicalObject { + fn to_json(self) -> Value { + json!({ + "uri": self.uri, + "extension": self.extension, + "eventTypes": self.event_types, + "hashes": self.hashes, + }) + } +} + +fn physical_object_uri(sample: &AdjustedRecord) -> String { + if sample.event_type == "object_hash" + && let Some((uri, _)) = split_object_hash_key(&sample.key) + { + return uri.to_string(); + } + sample.key.clone() +} + +fn event_hash(sample: &AdjustedRecord) -> Option<&str> { + if sample.event_type == "object_hash" { + split_object_hash_key(&sample.key).map(|(_, hash)| hash) + } else { + None + } +} + +fn publication_point_prefix(uri: &str) -> String { + uri.rsplit_once('/') + .map(|(prefix, _)| format!("{prefix}/")) + .unwrap_or_else(|| uri.to_string()) +} + +fn object_extension(uri: &str) -> &str { + uri.rsplit_once('.') + .map(|(_, extension)| extension) + .unwrap_or("") +} + +fn sample_by_side_seq_run<'a>( + left: &'a [SequenceSample], + right: &'a [SequenceSample], + side: Side, + seq: u32, + run_id: &str, +) -> Option<&'a SequenceSample> { + samples_for_side(side, left, right) + .iter() + .find(|sample| sample.raw.seq == seq && sample.raw.run_id == run_id) +} + +impl AnalysisResult { + fn add(&mut self, class: &'static str, record: SampleRecord, sample_limit: usize) { + let stats = self.stats.entry(class).or_default(); + stats.total += 1; + if stats.samples.len() < sample_limit { + stats.samples.push(record); + } + } +} + +fn peer_sample_at_seq(peer: &[SequenceSample], seq: u32) -> Option<&SequenceSample> { + peer.iter().find(|sample| sample.raw.seq == seq) +} + +fn find_future_match( + peer: &[SequenceSample], + source: &SequenceSample, + key: &str, + extract: &F, + args: &Args, +) -> Option +where + F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, +{ + peer.iter() + .filter(|candidate| is_in_alignment_window(source, candidate, args)) + .find(|candidate| extract(candidate).contains(key)) + .map(|candidate| { + occurrence( + candidate, + if candidate.raw.side.as_deref() == Some("left") { + Side::Left + } else { + Side::Right + }, + ) + }) +} + +fn find_future_hash_match( + peer: &[SequenceSample], + source: &SequenceSample, + uri: &str, + hash: &str, + args: &Args, +) -> Option { + peer.iter() + .filter(|candidate| is_in_alignment_window(source, candidate, args)) + .find(|candidate| { + candidate + .objects + .get(uri) + .is_some_and(|peer_hash| peer_hash == hash) + }) + .map(|candidate| { + occurrence( + candidate, + if candidate.raw.side.as_deref() == Some("left") { + Side::Left + } else { + Side::Right + }, + ) + }) +} + +fn is_in_alignment_window( + source: &SequenceSample, + candidate: &SequenceSample, + args: &Args, +) -> bool { + if candidate.raw.seq < source.raw.seq { + return false; + } + let run_delta = candidate.raw.seq.saturating_sub(source.raw.seq); + let time_delta = candidate.validation_time - source.validation_time; + let secs = time_delta.whole_seconds().abs(); + run_delta <= args.align_window_runs || secs <= args.align_window_secs +} + +fn occurrence(sample: &SequenceSample, side: Side) -> EventOccurrence { + EventOccurrence { + side, + seq: sample.raw.seq, + run_id: sample.raw.run_id.clone(), + } +} + +fn build_output( + args: &Args, + left: &[SequenceSample], + right: &[SequenceSample], + result: &AnalysisResult, + adjusted: &AdjustedAnalysis, + sandwich: &SandwichAnalysis, +) -> Value { + let classifications = result + .stats + .iter() + .map(|(class, stats)| { + json!({ + "classification": class, + "count": stats.total, + "samples": stats.samples.iter().map(sample_to_json).collect::>(), + }) + }) + .collect::>(); + let adjusted_classifications = adjusted + .stats + .iter() + .map(|(class, stats)| { + json!({ + "classification": class, + "occurrences": stats.total, + "uniqueKeys": stats.unique_keys.len(), + "samples": stats.samples.iter().map(adjusted_sample_to_json).collect::>(), + }) + }) + .collect::>(); + let sandwich_classifications = sandwich + .stats + .iter() + .map(|(class, stats)| { + json!({ + "classification": class, + "occurrences": stats.total, + "uniqueKeys": stats.unique_keys.len(), + "samples": stats.samples.iter().map(sandwich_sample_to_json).collect::>(), + }) + }) + .collect::>(); + json!({ + "schemaVersion": 1, + "generatedBy": "sequence_triage_ccr_cir", + "inputContract": "left-right-sequence-jsonl-with-ccr-cir-artifacts", + "parameters": { + "leftSequence": path_string(&args.left_sequence), + "rightSequence": path_string(&args.right_sequence), + "alignWindowRuns": args.align_window_runs, + "alignWindowSecs": args.align_window_secs, + "sampleLimit": args.sample_limit, + "warmupSamples": args.warmup_samples, + "cooldownSamples": args.cooldown_samples, + "timelineSampleLimit": if args.timeline_sample_limit == 0 { args.sample_limit } else { args.timeline_sample_limit }, + }, + "left": sequence_summary(left), + "right": sequence_summary(right), + "classificationCounts": classifications, + "totals": { + "resolvedTemporalLike": count_class(result, "TEMPORAL_LAG_RESOLVED") + count_class(result, "CONTENT_ROLLOVER_RESOLVED"), + "persistent": count_prefix(result, "PERSISTENT_"), + "unclassified": count_class(result, "UNCLASSIFIED_INSUFFICIENT_WINDOW"), + }, + "adjusted": { + "warmupSamples": args.warmup_samples, + "cooldownSamples": args.cooldown_samples, + "rawPersistent": { + "occurrences": adjusted.raw_persistent_occurrences, + "uniqueKeys": adjusted.raw_persistent_unique_keys, + }, + "edgeFilteredPersistent": { + "occurrences": adjusted.edge_filtered_occurrences, + "uniqueKeys": adjusted.edge_filtered_unique_keys, + }, + "adjustedStablePersistent": { + "occurrences": adjusted.adjusted_stable_occurrences, + "uniqueKeys": adjusted.adjusted_stable_unique_keys, + }, + "classificationCounts": adjusted_classifications, + "uriTimelineSamples": adjusted.uri_timeline_samples, + "stableObjectGroups": adjusted.stable_object_groups, + "interpretation": { + "rawPersistentMeaning": "raw persistent keeps the original #043 single-event matching semantics.", + "edgeFilteredMeaning": "edge-filtered persistent keeps only non-warmup and non-cooldown source occurrences.", + "adjustedStableMeaning": "adjusted stable persistent keeps non-edge findings after URI-level rollover and TA projection filtering.", + "stableObjectGroupsMeaning": "STABLE_OBJECT_SET_DIVERGENCE events are additionally collapsed by source CIR, publication point, and physical object URI so object_uri/object_hash duplicate event views do not inflate physical-object counts.", + } + }, + "sandwich": { + "strictTimeWindow": true, + "method": "For each side, use two adjacent source samples as a stable interval. If source_start.time < peer.time < source_end.time and the source value is identical at both interval endpoints, the peer sample is expected to contain the same value.", + "totals": { + "occurrences": sandwich.total_occurrences, + "uniqueKeys": sandwich.unique_keys.len(), + }, + "bySetType": sandwich.by_set_type, + "classificationCounts": sandwich_classifications, + "interpretation": { + "missingStableObject": "The source side proves a URI/hash is stable across an interval that contains the peer sample, but the peer sample has no such URI.", + "hashMismatchStableObject": "The source side proves a URI/hash is stable across an interval that contains the peer sample, but the peer sample has the same URI with a different hash.", + "missingStableReject": "The source side consistently rejects the URI across an interval that contains the peer sample, but the peer sample does not reject it.", + "missingStableOutput": "The source side consistently outputs a VRP/VAP key across an interval that contains the peer sample, but the peer sample does not output it." + } + }, + "interpretation": { + "temporalResolvedMeaning": "Event appeared only on one side at one sample but aligned in the peer sequence later.", + "persistentMeaning": "Event did not align within the configured run/time window; inspect as a candidate RP behavior difference or persistent sync/input difference.", + "limits": [ + "Sequence triage only reads sequence JSONL plus referenced CCR/CIR files.", + "It does not read report.json, logs, repo-bytes DB, cache, mirror, or raw objects for root cause proof.", + "Resolved temporal findings reduce false positives but do not prove the exact external repository update time." + ] + } + }) +} + +fn sequence_summary(samples: &[SequenceSample]) -> Value { + json!({ + "sampleCount": samples.len(), + "rpIds": samples.iter().map(|sample| sample.raw.rp_id.clone()).collect::>(), + "firstSeq": samples.first().map(|sample| sample.raw.seq), + "lastSeq": samples.last().map(|sample| sample.raw.seq), + "firstValidationTime": samples.first().map(|sample| format_time(sample.validation_time)), + "lastValidationTime": samples.last().map(|sample| format_time(sample.validation_time)), + "samples": samples.iter().map(|sample| json!({ + "seq": sample.raw.seq, + "runId": sample.raw.run_id, + "syncMode": sample.raw.sync_mode, + "startTime": sample.raw.start_time, + "finishTime": sample.raw.finish_time, + "validationTime": format_time(sample.validation_time), + "status": sample.raw.status, + "ccrPath": path_string(&sample.ccr_path), + "cirPath": path_string(&sample.cir_path), + "ccrSha256": sample.raw.ccr_sha256, + "cirSha256": sample.raw.cir_sha256, + "wallMs": sample.raw.wall_ms, + "maxRssKb": sample.raw.max_rss_kb, + "vrps": sample.raw.vrps.or(Some(sample.vrps.len() as u64)), + "vaps": sample.raw.vaps.or(Some(sample.vaps.len() as u64)), + "objectCount": sample.object_uris.len(), + "rejectCount": sample.rejects.len(), + "trustAnchorCount": sample.trust_anchors.len(), + })).collect::>(), + }) +} + +fn sample_to_json(sample: &SampleRecord) -> Value { + json!({ + "classification": sample.classification, + "eventType": sample.event_type, + "key": sample.key, + "sourceSide": sample.source_side.as_str(), + "sourceSeq": sample.source_seq, + "sourceRunId": sample.source_run_id, + "matchedSeq": sample.matched_seq, + "matchedRunId": sample.matched_run_id, + "note": sample.note, + }) +} + +fn adjusted_sample_to_json(sample: &AdjustedRecord) -> Value { + json!({ + "classification": sample.classification, + "eventType": sample.event_type, + "key": sample.key, + "sourceSide": sample.source_side.as_str(), + "sourceSeq": sample.source_seq, + "sourceRunId": sample.source_run_id, + "note": sample.note, + }) +} + +fn sandwich_sample_to_json(sample: &SandwichRecord) -> Value { + json!({ + "classification": sample.classification, + "setType": sample.set_type, + "key": sample.key, + "sourceSide": sample.source_side.as_str(), + "sourceStartSeq": sample.source_start_seq, + "sourceStartRunId": sample.source_start_run_id, + "sourceEndSeq": sample.source_end_seq, + "sourceEndRunId": sample.source_end_run_id, + "peerSeq": sample.peer_seq, + "peerRunId": sample.peer_run_id, + "sourceValue": sample.source_value, + "peerValue": sample.peer_value, + "sourceStartTime": sample.source_start_time, + "peerTime": sample.peer_time, + "sourceEndTime": sample.source_end_time, + "note": sample.note, + }) +} + +fn count_class(result: &AnalysisResult, class: &'static str) -> usize { + result + .stats + .get(class) + .map(|stats| stats.total) + .unwrap_or(0) +} + +fn count_prefix(result: &AnalysisResult, prefix: &str) -> usize { + result + .stats + .iter() + .filter(|(class, _)| class.starts_with(prefix)) + .map(|(_, stats)| stats.total) + .sum() +} + +fn write_json(path: &Path, value: &Value) -> Result<(), String> { + std::fs::write( + path, + serde_json::to_string_pretty(value).map_err(|e| e.to_string())? + "\n", + ) + .map_err(|e| format!("write JSON failed: {}: {e}", path.display())) +} + +fn write_markdown(path: &Path, output: &Value) -> Result<(), String> { + let mut lines = vec![ + "# CCR/CIR Sequence Triage Summary".to_string(), + "".to_string(), + format!( + "- `generatedBy`: `{}`", + output["generatedBy"].as_str().unwrap_or("") + ), + format!( + "- `leftSamples`: `{}`", + output["left"]["sampleCount"].as_u64().unwrap_or(0) + ), + format!( + "- `rightSamples`: `{}`", + output["right"]["sampleCount"].as_u64().unwrap_or(0) + ), + format!( + "- `alignWindowRuns`: `{}`", + output["parameters"]["alignWindowRuns"] + .as_u64() + .unwrap_or(0) + ), + format!( + "- `alignWindowSecs`: `{}`", + output["parameters"]["alignWindowSecs"] + .as_i64() + .unwrap_or(0) + ), + format!( + "- `warmupSamples`: `{}`", + output["adjusted"]["warmupSamples"].as_u64().unwrap_or(0) + ), + format!( + "- `cooldownSamples`: `{}`", + output["adjusted"]["cooldownSamples"].as_u64().unwrap_or(0) + ), + "".to_string(), + "## Classification Counts".to_string(), + "".to_string(), + "| Classification | Count |".to_string(), + "|---|---:|".to_string(), + ]; + if let Some(classes) = output["classificationCounts"].as_array() { + for item in classes { + lines.push(format!( + "| `{}` | {} |", + item["classification"].as_str().unwrap_or(""), + item["count"].as_u64().unwrap_or(0) + )); + } + } + lines.extend([ + "".to_string(), + "## Adjusted Boundary / Rollover Classification".to_string(), + "".to_string(), + format!( + "- `rawPersistent`: `{}` occurrences / `{}` unique keys", + output["adjusted"]["rawPersistent"]["occurrences"] + .as_u64() + .unwrap_or(0), + output["adjusted"]["rawPersistent"]["uniqueKeys"] + .as_u64() + .unwrap_or(0) + ), + format!( + "- `edgeFilteredPersistent`: `{}` occurrences / `{}` unique keys", + output["adjusted"]["edgeFilteredPersistent"]["occurrences"] + .as_u64() + .unwrap_or(0), + output["adjusted"]["edgeFilteredPersistent"]["uniqueKeys"] + .as_u64() + .unwrap_or(0) + ), + format!( + "- `adjustedStablePersistent`: `{}` occurrences / `{}` unique keys", + output["adjusted"]["adjustedStablePersistent"]["occurrences"] + .as_u64() + .unwrap_or(0), + output["adjusted"]["adjustedStablePersistent"]["uniqueKeys"] + .as_u64() + .unwrap_or(0) + ), + "".to_string(), + "| Adjusted Classification | Occurrences | Unique Keys |".to_string(), + "|---|---:|---:|".to_string(), + ]); + if let Some(classes) = output["adjusted"]["classificationCounts"].as_array() { + for item in classes { + lines.push(format!( + "| `{}` | {} | {} |", + item["classification"].as_str().unwrap_or(""), + item["occurrences"].as_u64().unwrap_or(0), + item["uniqueKeys"].as_u64().unwrap_or(0) + )); + } + } + if let Some(groups) = output["adjusted"]["stableObjectGroups"].as_array() + && !groups.is_empty() + { + lines.extend([ + "".to_string(), + "## Stable Object Groups".to_string(), + "".to_string(), + "| Source | CIR | Publication Point | Events | Physical Objects |".to_string(), + "|---|---|---|---:|---:|".to_string(), + ]); + for group in groups { + lines.push(format!( + "| `{}/seq{}/{}` | `{}` | `{}` | {} | {} |", + group["sourceSide"].as_str().unwrap_or(""), + group["sourceSeq"].as_u64().unwrap_or(0), + group["sourceRunId"].as_str().unwrap_or(""), + group["sourceCirPath"].as_str().unwrap_or(""), + group["publicationPoint"].as_str().unwrap_or(""), + group["eventCount"].as_u64().unwrap_or(0), + group["physicalObjectCount"].as_u64().unwrap_or(0), + )); + } + } + lines.extend([ + "".to_string(), + "## Sandwich Anomaly Check".to_string(), + "".to_string(), + format!( + "- `occurrences`: `{}`", + output["sandwich"]["totals"]["occurrences"] + .as_u64() + .unwrap_or(0) + ), + format!( + "- `uniqueKeys`: `{}`", + output["sandwich"]["totals"]["uniqueKeys"] + .as_u64() + .unwrap_or(0) + ), + "- Rule: source side has two adjacent stable samples, and peer sample timestamp is strictly between them.".to_string(), + "".to_string(), + "| Sandwich Classification | Occurrences | Unique Keys |".to_string(), + "|---|---:|---:|".to_string(), + ]); + if let Some(classes) = output["sandwich"]["classificationCounts"].as_array() { + for item in classes { + lines.push(format!( + "| `{}` | {} | {} |", + item["classification"].as_str().unwrap_or(""), + item["occurrences"].as_u64().unwrap_or(0), + item["uniqueKeys"].as_u64().unwrap_or(0) + )); + } + } + lines.extend([ + "".to_string(), + "## Interpretation".to_string(), + "".to_string(), + "- `TEMPORAL_LAG_RESOLVED` / `CONTENT_ROLLOVER_RESOLVED` 表示差异在后续采样中对齐,优先视为采样时刻或仓库滚动窗口差异。".to_string(), + "- `PERSISTENT_*` 表示配置窗口内仍未对齐,才是后续人工排查的高价值候选。".to_string(), + "- `EDGE_*` 表示差异落在序列首尾,缺少前置或后续观察,不能直接当作实现差异。".to_string(), + "- `STABLE_*` 表示经过首尾边界过滤和 URI-level rollover 过滤后仍存在的稳定候选。".to_string(), + ]); + std::fs::write(path, lines.join("\n") + "\n") + .map_err(|e| format!("write markdown failed: {}: {e}", path.display())) +} + +fn write_samples_jsonl(path: &Path, result: &AnalysisResult) -> Result<(), String> { + let mut body = String::new(); + for stats in result.stats.values() { + for sample in &stats.samples { + body.push_str( + &serde_json::to_string(&sample_to_json(sample)).map_err(|e| e.to_string())?, + ); + body.push('\n'); + } + } + std::fs::write(path, body).map_err(|e| format!("write samples failed: {}: {e}", path.display())) +} + +fn read_file(path: &Path) -> Result, String> { + std::fs::read(path).map_err(|e| format!("read file failed: {}: {e}", path.display())) +} + +fn resolve_path(base_dir: &Path, path: &Path) -> PathBuf { + if path.is_absolute() { + path.to_path_buf() + } else { + base_dir.join(path) + } +} + +fn parse_rfc3339(value: &str) -> Result { + OffsetDateTime::parse(value, &Rfc3339) + .map_err(|e| format!("parse RFC3339 failed: {value}: {e}")) +} + +fn format_time(value: OffsetDateTime) -> String { + value.format(&Rfc3339).unwrap_or_else(|_| value.to_string()) +} + +fn path_string(path: &Path) -> String { + path.to_string_lossy().into_owned() +} + +fn object_hash_key(uri: &str, hash: &str) -> String { + format!("{uri}|{hash}") +} + +#[cfg(test)] +mod tests { + use super::*; + use rpki::ccr::{ + CcrContentInfo, CcrDigestAlgorithm, RpkiCanonicalCacheRepresentation, + build_aspa_payload_state, build_roa_payload_state, encode_content_info, + }; + use rpki::cir::{ + CIR_VERSION_V3, CanonicalInputRepresentation, CirHashAlgorithm, CirObject, + CirRejectedObject, CirTrustAnchor, compute_reject_list_sha256, encode_cir, sha256, + }; + use rpki::data_model::roa::{IpPrefix, RoaAfi}; + use rpki::validation::objects::{AspaAttestation, Vrp}; + + #[test] + fn parse_args_accepts_required_flags() { + let argv = vec![ + "sequence_triage_ccr_cir".to_string(), + "--left-sequence".to_string(), + "left.jsonl".to_string(), + "--right-sequence".to_string(), + "right.jsonl".to_string(), + "--out-dir".to_string(), + "out".to_string(), + "--align-window-runs".to_string(), + "3".to_string(), + ]; + let args = parse_args(&argv).expect("parse"); + assert_eq!(args.align_window_runs, 3); + assert_eq!(args.align_window_secs, 1800); + assert_eq!(args.warmup_samples, 1); + assert_eq!(args.cooldown_samples, 1); + assert_eq!(args.timeline_sample_limit, 0); + } + + #[test] + fn run_classifies_temporal_and_persistent_differences() { + let temp = tempfile::tempdir().expect("tempdir"); + let root = temp.path(); + write_sample( + root, + "left1", + "left", + 1, + &[object("rsync://example.net/a.roa", 0x11)], + &[], + 64496, + ); + write_sample( + root, + "left2", + "left", + 2, + &[ + object("rsync://example.net/a.roa", 0x11), + object("rsync://example.net/persistent.roa", 0x44), + ], + &[], + 64496, + ); + write_sample(root, "right1", "right", 1, &[], &[], 64497); + write_sample( + root, + "right2", + "right", + 2, + &[object("rsync://example.net/a.roa", 0x11)], + &[], + 64497, + ); + std::fs::write( + root.join("left.jsonl"), + jsonl(&[ + item("left", 1, "left1/result.ccr", "left1/result.cir"), + item("left", 2, "left2/result.ccr", "left2/result.cir"), + ]), + ) + .unwrap(); + std::fs::write( + root.join("right.jsonl"), + jsonl(&[ + item("right", 1, "right1/result.ccr", "right1/result.cir"), + item("right", 2, "right2/result.ccr", "right2/result.cir"), + ]), + ) + .unwrap(); + run(Args { + left_sequence: root.join("left.jsonl"), + right_sequence: root.join("right.jsonl"), + out_dir: root.join("out"), + align_window_runs: 2, + align_window_secs: 3600, + sample_limit: 20, + warmup_samples: 1, + cooldown_samples: 0, + timeline_sample_limit: 0, + }) + .expect("run"); + let output: Value = serde_json::from_str( + &std::fs::read_to_string(root.join("out/sequence-triage.json")).unwrap(), + ) + .unwrap(); + assert!(class_count(&output, "TEMPORAL_LAG_RESOLVED") > 0); + assert!(class_count(&output, "PERSISTENT_OBJECT_SET_DIVERGENCE") > 0); + } + + #[test] + fn run_classifies_reject_and_output_divergence() { + let temp = tempfile::tempdir().expect("tempdir"); + let root = temp.path(); + let objects = [object("rsync://example.net/a.roa", 0x11)]; + write_sample( + root, + "left1", + "left", + 1, + &objects, + &["rsync://example.net/a.roa"], + 64496, + ); + write_sample(root, "right1", "right", 1, &objects, &[], 64497); + std::fs::write( + root.join("left.jsonl"), + jsonl(&[item("left", 1, "left1/result.ccr", "left1/result.cir")]), + ) + .unwrap(); + std::fs::write( + root.join("right.jsonl"), + jsonl(&[item("right", 1, "right1/result.ccr", "right1/result.cir")]), + ) + .unwrap(); + run(Args { + left_sequence: root.join("left.jsonl"), + right_sequence: root.join("right.jsonl"), + out_dir: root.join("out"), + align_window_runs: 0, + align_window_secs: 0, + sample_limit: 20, + warmup_samples: 0, + cooldown_samples: 0, + timeline_sample_limit: 0, + }) + .expect("run"); + let output: Value = serde_json::from_str( + &std::fs::read_to_string(root.join("out/sequence-triage.json")).unwrap(), + ) + .unwrap(); + assert!(class_count(&output, "PERSISTENT_REJECT_DIVERGENCE") > 0); + assert!(class_count(&output, "PERSISTENT_OUTPUT_DIVERGENCE") > 0); + } + + #[test] + fn run_adjusted_classifies_leading_content_rollover() { + let temp = tempfile::tempdir().expect("tempdir"); + let root = temp.path(); + let uri = "rsync://example.net/a.roa"; + write_sample(root, "left1", "left", 1, &[object(uri, 0x11)], &[], 64496); + write_sample(root, "left2", "left", 2, &[object(uri, 0x22)], &[], 64496); + write_sample(root, "right1", "right", 1, &[object(uri, 0x22)], &[], 64496); + write_sample(root, "right2", "right", 2, &[object(uri, 0x22)], &[], 64496); + std::fs::write( + root.join("left.jsonl"), + jsonl(&[ + item("left", 1, "left1/result.ccr", "left1/result.cir"), + item("left", 2, "left2/result.ccr", "left2/result.cir"), + ]), + ) + .unwrap(); + std::fs::write( + root.join("right.jsonl"), + jsonl(&[ + item("right", 1, "right1/result.ccr", "right1/result.cir"), + item("right", 2, "right2/result.ccr", "right2/result.cir"), + ]), + ) + .unwrap(); + run(Args { + left_sequence: root.join("left.jsonl"), + right_sequence: root.join("right.jsonl"), + out_dir: root.join("out"), + align_window_runs: 0, + align_window_secs: 0, + sample_limit: 20, + warmup_samples: 1, + cooldown_samples: 0, + timeline_sample_limit: 0, + }) + .expect("run"); + let output: Value = serde_json::from_str( + &std::fs::read_to_string(root.join("out/sequence-triage.json")).unwrap(), + ) + .unwrap(); + assert!(adjusted_class_occurrences(&output, "EDGE_LEADING_CONTENT_ROLLOVER") > 0); + assert_eq!( + adjusted_class_occurrences(&output, "STABLE_CONTENT_DIVERGENCE"), + 0 + ); + assert_eq!( + output["adjusted"]["adjustedStablePersistent"]["occurrences"] + .as_u64() + .unwrap(), + 0 + ); + } + + #[test] + fn run_adjusted_classifies_stable_middle_content_divergence() { + let temp = tempfile::tempdir().expect("tempdir"); + let root = temp.path(); + let uri = "rsync://example.net/a.roa"; + for seq in 1..=3 { + write_sample( + root, + &format!("left{seq}"), + "left", + seq, + &[object(uri, 0x11)], + &[], + 64496, + ); + write_sample( + root, + &format!("right{seq}"), + "right", + seq, + &[object(uri, 0x22)], + &[], + 64496, + ); + } + std::fs::write( + root.join("left.jsonl"), + jsonl(&[ + item("left", 1, "left1/result.ccr", "left1/result.cir"), + item("left", 2, "left2/result.ccr", "left2/result.cir"), + item("left", 3, "left3/result.ccr", "left3/result.cir"), + ]), + ) + .unwrap(); + std::fs::write( + root.join("right.jsonl"), + jsonl(&[ + item("right", 1, "right1/result.ccr", "right1/result.cir"), + item("right", 2, "right2/result.ccr", "right2/result.cir"), + item("right", 3, "right3/result.ccr", "right3/result.cir"), + ]), + ) + .unwrap(); + run(Args { + left_sequence: root.join("left.jsonl"), + right_sequence: root.join("right.jsonl"), + out_dir: root.join("out"), + align_window_runs: 0, + align_window_secs: 0, + sample_limit: 20, + warmup_samples: 1, + cooldown_samples: 1, + timeline_sample_limit: 0, + }) + .expect("run"); + let output: Value = serde_json::from_str( + &std::fs::read_to_string(root.join("out/sequence-triage.json")).unwrap(), + ) + .unwrap(); + assert!(adjusted_class_occurrences(&output, "STABLE_CONTENT_DIVERGENCE") > 0); + assert_eq!( + output["adjusted"]["adjustedStablePersistent"]["occurrences"] + .as_u64() + .unwrap(), + 2 + ); + } + + #[test] + fn run_adjusted_filters_trailing_output() { + let temp = tempfile::tempdir().expect("tempdir"); + let root = temp.path(); + let objects = [object("rsync://example.net/a.roa", 0x11)]; + write_sample(root, "left1", "left", 1, &objects, &[], 64496); + write_sample(root, "left2", "left", 2, &objects, &[], 64497); + write_sample(root, "right1", "right", 1, &objects, &[], 64496); + write_sample(root, "right2", "right", 2, &objects, &[], 64496); + std::fs::write( + root.join("left.jsonl"), + jsonl(&[ + item("left", 1, "left1/result.ccr", "left1/result.cir"), + item("left", 2, "left2/result.ccr", "left2/result.cir"), + ]), + ) + .unwrap(); + std::fs::write( + root.join("right.jsonl"), + jsonl(&[ + item("right", 1, "right1/result.ccr", "right1/result.cir"), + item("right", 2, "right2/result.ccr", "right2/result.cir"), + ]), + ) + .unwrap(); + run(Args { + left_sequence: root.join("left.jsonl"), + right_sequence: root.join("right.jsonl"), + out_dir: root.join("out"), + align_window_runs: 0, + align_window_secs: 0, + sample_limit: 20, + warmup_samples: 0, + cooldown_samples: 1, + timeline_sample_limit: 0, + }) + .expect("run"); + let output: Value = serde_json::from_str( + &std::fs::read_to_string(root.join("out/sequence-triage.json")).unwrap(), + ) + .unwrap(); + assert!(adjusted_class_occurrences(&output, "EDGE_TRAILING_UNRESOLVED") > 0); + assert_eq!( + adjusted_class_occurrences(&output, "STABLE_OUTPUT_DIVERGENCE"), + 0 + ); + } + + #[test] + fn run_groups_stable_object_events_by_physical_object() { + let temp = tempfile::tempdir().expect("tempdir"); + let root = temp.path(); + let missing = "rsync://example.net/repo/pp/a.roa"; + write_sample( + root, + "left1", + "left", + 1, + &[object("rsync://example.net/repo/pp/base.roa", 0x10)], + &[], + 64496, + ); + write_sample( + root, + "left2", + "left", + 2, + &[ + object("rsync://example.net/repo/pp/base.roa", 0x10), + object(missing, 0x11), + ], + &[], + 64496, + ); + write_sample( + root, + "left3", + "left", + 3, + &[object("rsync://example.net/repo/pp/base.roa", 0x10)], + &[], + 64496, + ); + for seq in 1..=3 { + write_sample( + root, + &format!("right{seq}"), + "right", + seq, + &[object("rsync://example.net/repo/pp/base.roa", 0x10)], + &[], + 64496, + ); + } + std::fs::write( + root.join("left.jsonl"), + jsonl(&[ + item("left", 1, "left1/result.ccr", "left1/result.cir"), + item("left", 2, "left2/result.ccr", "left2/result.cir"), + item("left", 3, "left3/result.ccr", "left3/result.cir"), + ]), + ) + .unwrap(); + std::fs::write( + root.join("right.jsonl"), + jsonl(&[ + item("right", 1, "right1/result.ccr", "right1/result.cir"), + item("right", 2, "right2/result.ccr", "right2/result.cir"), + item("right", 3, "right3/result.ccr", "right3/result.cir"), + ]), + ) + .unwrap(); + run(Args { + left_sequence: root.join("left.jsonl"), + right_sequence: root.join("right.jsonl"), + out_dir: root.join("out"), + align_window_runs: 0, + align_window_secs: 0, + sample_limit: 20, + warmup_samples: 1, + cooldown_samples: 1, + timeline_sample_limit: 0, + }) + .expect("run"); + let output: Value = serde_json::from_str( + &std::fs::read_to_string(root.join("out/sequence-triage.json")).unwrap(), + ) + .unwrap(); + let groups = output["adjusted"]["stableObjectGroups"].as_array().unwrap(); + assert_eq!(groups.len(), 1); + assert_eq!(groups[0]["eventCount"].as_u64(), Some(2)); + assert_eq!(groups[0]["physicalObjectCount"].as_u64(), Some(1)); + assert_eq!( + groups[0]["publicationPoint"].as_str(), + Some("rsync://example.net/repo/pp/") + ); + assert_eq!( + groups[0]["physicalObjects"][0]["eventTypes"] + .as_array() + .unwrap() + .len(), + 2 + ); + } + + #[test] + fn run_sandwich_detects_object_hash_reject_and_output_anomalies() { + let temp = tempfile::tempdir().expect("tempdir"); + let root = temp.path(); + let stable_missing = object("rsync://example.net/pp/missing.roa", 0x11); + let stable_mismatch = object("rsync://example.net/pp/mismatch.roa", 0x22); + let peer_mismatch = object("rsync://example.net/pp/mismatch.roa", 0x33); + write_sample_with_ccr_seq( + root, + "left1", + 1, + &[stable_missing.clone(), stable_mismatch.clone()], + &["rsync://example.net/pp/rejected.roa"], + 9, + 64496, + ); + write_sample_with_ccr_seq( + root, + "left3", + 3, + &[stable_missing, stable_mismatch], + &["rsync://example.net/pp/rejected.roa"], + 9, + 64496, + ); + write_sample_with_ccr_seq(root, "right2", 2, &[peer_mismatch], &[], 10, 64497); + std::fs::write( + root.join("left.jsonl"), + jsonl(&[ + item("left", 1, "left1/result.ccr", "left1/result.cir"), + item("left", 3, "left3/result.ccr", "left3/result.cir"), + ]), + ) + .unwrap(); + std::fs::write( + root.join("right.jsonl"), + jsonl(&[item("right", 2, "right2/result.ccr", "right2/result.cir")]), + ) + .unwrap(); + run(Args { + left_sequence: root.join("left.jsonl"), + right_sequence: root.join("right.jsonl"), + out_dir: root.join("out"), + align_window_runs: 0, + align_window_secs: 0, + sample_limit: 20, + warmup_samples: 0, + cooldown_samples: 0, + timeline_sample_limit: 0, + }) + .expect("run"); + let output: Value = serde_json::from_str( + &std::fs::read_to_string(root.join("out/sequence-triage.json")).unwrap(), + ) + .unwrap(); + assert_eq!( + sandwich_class_occurrences(&output, "PEER_MISSING_STABLE_OBJECT"), + 1 + ); + assert_eq!( + sandwich_class_occurrences(&output, "PEER_HASH_MISMATCH_STABLE_OBJECT"), + 1 + ); + assert_eq!( + sandwich_class_occurrences(&output, "PEER_MISSING_STABLE_REJECT"), + 1 + ); + assert_eq!( + sandwich_class_occurrences(&output, "PEER_MISSING_STABLE_OUTPUT"), + 2 + ); + } + + fn class_count(output: &Value, class: &str) -> u64 { + output["classificationCounts"] + .as_array() + .unwrap() + .iter() + .find(|item| item["classification"].as_str() == Some(class)) + .and_then(|item| item["count"].as_u64()) + .unwrap_or(0) + } + + fn sandwich_class_occurrences(output: &Value, class: &str) -> u64 { + output["sandwich"]["classificationCounts"] + .as_array() + .unwrap() + .iter() + .find(|item| item["classification"].as_str() == Some(class)) + .and_then(|item| item["occurrences"].as_u64()) + .unwrap_or(0) + } + + fn adjusted_class_occurrences(output: &Value, class: &str) -> u64 { + output["adjusted"]["classificationCounts"] + .as_array() + .unwrap() + .iter() + .find(|item| item["classification"].as_str() == Some(class)) + .and_then(|item| item["occurrences"].as_u64()) + .unwrap_or(0) + } + + fn object(uri: &str, byte: u8) -> CirObject { + CirObject { + rsync_uri: uri.to_string(), + sha256: vec![byte; 32], + } + } + + fn write_sample( + root: &Path, + dir: &str, + side: &str, + seq: u32, + objects: &[CirObject], + rejected: &[&str], + asn: u32, + ) { + let dir = root.join(dir); + std::fs::create_dir_all(&dir).unwrap(); + let cir = sample_cir(seq, objects, rejected); + let ccr = sample_ccr(seq, asn); + std::fs::write(dir.join("result.cir"), encode_cir(&cir).unwrap()).unwrap(); + std::fs::write(dir.join("result.ccr"), encode_content_info(&ccr).unwrap()).unwrap(); + let _ = side; + } + + fn write_sample_with_ccr_seq( + root: &Path, + dir: &str, + cir_seq: u32, + objects: &[CirObject], + rejected: &[&str], + ccr_seq: u32, + asn: u32, + ) { + let dir = root.join(dir); + std::fs::create_dir_all(&dir).unwrap(); + let cir = sample_cir(cir_seq, objects, rejected); + let ccr = sample_ccr(ccr_seq, asn); + std::fs::write(dir.join("result.cir"), encode_cir(&cir).unwrap()).unwrap(); + std::fs::write(dir.join("result.ccr"), encode_content_info(&ccr).unwrap()).unwrap(); + } + + fn sample_time(seq: u32) -> OffsetDateTime { + OffsetDateTime::from_unix_timestamp(1_800_000_000 + i64::from(seq * 60)).unwrap() + } + + fn sample_cir( + seq: u32, + objects: &[CirObject], + rejected: &[&str], + ) -> CanonicalInputRepresentation { + let mut objects = objects.to_vec(); + objects.sort_by(|a, b| a.rsync_uri.cmp(&b.rsync_uri)); + let rejected_objects = rejected + .iter() + .map(|uri| CirRejectedObject { + object_uri: (*uri).to_string(), + reason: Some("test".to_string()), + }) + .collect::>(); + CanonicalInputRepresentation { + version: CIR_VERSION_V3, + hash_alg: CirHashAlgorithm::Sha256, + validation_time: sample_time(seq), + objects, + trust_anchors: vec![sample_trust_anchor()], + reject_list_sha256: compute_reject_list_sha256(rejected.iter().copied()), + rejected_objects, + } + } + + fn sample_trust_anchor() -> CirTrustAnchor { + let ta_uri = "rsync://example.net/ta.cer"; + let ta_der = b"ta-der".to_vec(); + CirTrustAnchor { + ta_rsync_uri: ta_uri.to_string(), + tal_uri: "https://example.net/root.tal".to_string(), + tal_bytes: format!("{ta_uri}\n\nAQID\n").into_bytes(), + ta_certificate_der: ta_der.clone(), + ta_certificate_sha256: sha256(&ta_der), + } + } + + fn sample_ccr(seq: u32, asn: u32) -> CcrContentInfo { + let vrps = build_roa_payload_state(&[Vrp { + asn, + prefix: IpPrefix { + afi: RoaAfi::Ipv4, + prefix_len: 24, + addr: [192, 0, seq as u8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + }, + max_length: 24, + }]) + .unwrap(); + let vaps = build_aspa_payload_state(&[AspaAttestation { + customer_as_id: asn, + provider_as_ids: vec![64497], + }]) + .unwrap(); + CcrContentInfo::new(RpkiCanonicalCacheRepresentation { + version: 0, + hash_alg: CcrDigestAlgorithm::Sha256, + produced_at: sample_time(seq), + mfts: None, + vrps: Some(vrps), + vaps: Some(vaps), + tas: None, + rks: None, + }) + } + + fn item(side: &str, seq: u32, ccr: &str, cir: &str) -> Value { + json!({ + "schemaVersion": 1, + "rpId": format!("{side}-rp"), + "side": side, + "seq": seq, + "runId": format!("{side}-{seq}"), + "syncMode": if seq == 1 { "snapshot" } else { "delta" }, + "status": "success", + "validationTime": format_time(sample_time(seq)), + "ccrPath": ccr, + "cirPath": cir + }) + } + + fn jsonl(items: &[Value]) -> String { + let mut out = String::new(); + for item in items { + out.push_str(&serde_json::to_string(item).unwrap()); + out.push('\n'); + } + out + } +}