diff --git a/scripts/experiments/feature043/run_sequence_triage_experiment.py b/scripts/experiments/feature043/run_sequence_triage_experiment.py index 1f278ed..7a1db15 100755 --- a/scripts/experiments/feature043/run_sequence_triage_experiment.py +++ b/scripts/experiments/feature043/run_sequence_triage_experiment.py @@ -2,6 +2,7 @@ from __future__ import annotations import argparse +from concurrent.futures import ThreadPoolExecutor, as_completed import hashlib import json import os @@ -56,15 +57,20 @@ def rsync_from_remote(target: str, source: str | Path, destination: Path) -> Non def rsync_run_artifacts_from_remote(target: str, source: str | Path, destination: Path) -> None: destination.mkdir(parents=True, exist_ok=True) - rsync_base = ["rsync", "-az", "--partial", "--partial-dir=.rsync-partial"] + rsync_base = ["rsync", "-az", "--ignore-missing-args", "--partial", "--partial-dir=.rsync-partial"] for name in [ "result.ccr", "result.cir", + "report.json", + "vrps.csv", + "vaps.csv", "process-time.txt", "remote-run-meta.json", "exit-code.txt", "started-at.txt", "finished-at.txt", + "stdout.log", + "stderr.log", ]: run_local([*rsync_base, f"{target}:{source}/{name}", f"{destination}/"]) @@ -260,6 +266,26 @@ def side_config(name: str) -> dict[str, Any]: raise SystemExit(f"unknown side config: {name}") +def parse_rirs(raw: str) -> list[str]: + rirs = [item.strip() for item in raw.split(",") if item.strip()] + if not rirs: + raise SystemExit("--rirs must contain at least one RIR") + seen: set[str] = set() + invalid: list[str] = [] + duplicate: list[str] = [] + for rir in rirs: + if rir not in DEFAULT_RIRS: + invalid.append(rir) + if rir in seen: + duplicate.append(rir) + seen.add(rir) + if invalid: + raise SystemExit(f"unsupported RIR(s): {','.join(invalid)}; valid values: {','.join(DEFAULT_RIRS)}") + if duplicate: + raise SystemExit(f"duplicate RIR(s): {','.join(duplicate)}") + return rirs + + def build_remote_command(remote_root: Path, side_name: str, side: dict[str, Any], side_label: str, seq: int, rirs: list[str]) -> tuple[Path, str]: run_dir = remote_root / "experiments" / "sequence" / side_label / f"run_{seq:04d}" state_dir = remote_root / "experiments" / "sequence" / side_label / "state" / side["rpKind"] @@ -429,12 +455,57 @@ def run_sequence_triage(local_exp_root: Path, args: argparse.Namespace) -> None: ]) +def run_side_sequence( + args: argparse.Namespace, + remote_root: Path, + local_exp_root: Path, + side_label: str, + side_name: str, + side: dict[str, Any], + seq_path: Path, + rirs: list[str], +) -> list[dict[str, Any]]: + side_progress: list[dict[str, Any]] = [] + for seq in range(1, args.samples_per_side + 1): + side_progress.append( + run_one_side_sample(args, remote_root, local_exp_root, side_label, side_name, side, seq_path, seq, rirs) + ) + return side_progress + + +def run_one_side_sample( + args: argparse.Namespace, + remote_root: Path, + local_exp_root: Path, + side_label: str, + side_name: str, + side: dict[str, Any], + seq_path: Path, + seq: int, + rirs: list[str], +) -> dict[str, Any]: + rir_label = ",".join(rirs) + print( + f"[run] {side_label} {side_name} seq={seq} rirs={rir_label} schedule={args.schedule_mode}", + flush=True, + ) + remote_run_dir = run_remote_sample(args.ssh_target, remote_root, side_name, side, side_label, seq, rirs) + local_run_dir = local_exp_root / side_label / f"run_{seq:04d}" + rsync_run_artifacts_from_remote(args.ssh_target, remote_run_dir, local_run_dir) + item = build_sequence_item(local_exp_root, side_name, side_label, side, seq, local_run_dir) + item["scheduleMode"] = args.schedule_mode + append_jsonl(seq_path, item) + print( + f"[done] {side_label} seq={seq} wallMs={item.get('wallMs')} vrps={item.get('vrps')} vaps={item.get('vaps')} objects={item.get('cirObjectCount')} rejects={item.get('cirRejectCount')}", + flush=True, + ) + return item + + def run_experiment(args: argparse.Namespace) -> None: if not args.skip_build: build_tool_binaries() - rirs = [item.strip() for item in args.rirs.split(",") if item.strip()] - if set(rirs) != set(DEFAULT_RIRS) or len(rirs) != len(DEFAULT_RIRS): - raise SystemExit("#043 remote acceptance requires all5 RIRs: afrinic,apnic,arin,lacnic,ripe") + rirs = parse_rirs(args.rirs) left = side_config(args.left) right = side_config(args.right) run_root = Path(args.run_root).resolve() @@ -447,6 +518,7 @@ def run_experiment(args: argparse.Namespace) -> None: "right": args.right, "samplesPerSide": args.samples_per_side, "rirs": rirs, + "scheduleMode": args.schedule_mode, "remoteRoot": str(remote_root), "sshTarget": args.ssh_target, }) @@ -467,19 +539,24 @@ def run_experiment(args: argparse.Namespace) -> None: left_seq_path.unlink(missing_ok=True) right_seq_path.unlink(missing_ok=True) progress: list[dict[str, Any]] = [] - for seq in range(1, args.samples_per_side + 1): - for side_label, side_name, side, seq_path in [("A", args.left, left, left_seq_path), ("B", args.right, right, right_seq_path)]: - print(f"[run] {side_label} {side_name} seq={seq} all5", flush=True) - remote_run_dir = run_remote_sample(args.ssh_target, remote_root, side_name, side, side_label, seq, rirs) - local_run_dir = local_exp_root / side_label / f"run_{seq:04d}" - rsync_run_artifacts_from_remote(args.ssh_target, remote_run_dir, local_run_dir) - item = build_sequence_item(local_exp_root, side_name, side_label, side, seq, local_run_dir) - append_jsonl(seq_path, item) - progress.append(item) - print( - f"[done] {side_label} seq={seq} wallMs={item.get('wallMs')} vrps={item.get('vrps')} vaps={item.get('vaps')} objects={item.get('cirObjectCount')} rejects={item.get('cirRejectCount')}", - flush=True, - ) + if args.schedule_mode == "interleaved": + for seq in range(1, args.samples_per_side + 1): + for side_label, side_name, side, seq_path in [ + ("A", args.left, left, left_seq_path), + ("B", args.right, right, right_seq_path), + ]: + progress.append( + run_one_side_sample(args, remote_root, local_exp_root, side_label, side_name, side, seq_path, seq, rirs) + ) + else: + with ThreadPoolExecutor(max_workers=2) as executor: + futures = [ + executor.submit(run_side_sequence, args, remote_root, local_exp_root, "A", args.left, left, left_seq_path, rirs), + executor.submit(run_side_sequence, args, remote_root, local_exp_root, "B", args.right, right, right_seq_path, rirs), + ] + for future in as_completed(futures): + progress.extend(future.result()) + progress.sort(key=lambda item: (str(item.get("side")), int(item.get("seq") or 0))) write_json(local_exp_root / "run-progress.json", progress) run_sequence_triage(local_exp_root, args) ssh_script(args.ssh_target, f"df -h /data / > {shlex.quote(str(remote_root / 'df-after.txt'))} 2>&1 || true; free -h > {shlex.quote(str(remote_root / 'free-after.txt'))} 2>&1 || true") @@ -496,6 +573,7 @@ def main() -> None: parser.add_argument("--right", default="rpki-client-standard") parser.add_argument("--samples-per-side", type=int, default=3) parser.add_argument("--rirs", default=",".join(DEFAULT_RIRS)) + parser.add_argument("--schedule-mode", choices=["interleaved", "parallel"], default="interleaved") parser.add_argument("--align-window-runs", type=int, default=2) parser.add_argument("--align-window-secs", type=int, default=1800) parser.add_argument("--sample-limit", type=int, default=200) diff --git a/src/tools/sequence_triage_ccr_cir.rs b/src/tools/sequence_triage_ccr_cir.rs index 67c01e1..3f82ac7 100644 --- a/src/tools/sequence_triage_ccr_cir.rs +++ b/src/tools/sequence_triage_ccr_cir.rs @@ -547,6 +547,198 @@ mod tests { assert!(output.get("adjusted").is_none()); } + #[test] + fn run_uses_cir_validation_time_and_sorts_by_time_not_raw_seq() { + let temp = tempfile::tempdir().expect("tempdir"); + let root = temp.path(); + let stable = object("rsync://example.net/pp/stable.roa", 0x11); + write_sample( + root, + "left5", + "left", + 5, + std::slice::from_ref(&stable), + &[], + 64496, + ); + write_sample( + root, + "left1", + "left", + 1, + std::slice::from_ref(&stable), + &[], + 64496, + ); + write_sample(root, "right3", "right", 3, &[], &[], 64497); + std::fs::write( + root.join("left.jsonl"), + jsonl(&[ + item("left", 1, "left5/result.ccr", "left5/result.cir"), + item("left", 2, "left1/result.ccr", "left1/result.cir"), + ]), + ) + .unwrap(); + std::fs::write( + root.join("right.jsonl"), + jsonl(&[item("right", 10, "right3/result.ccr", "right3/result.cir")]), + ) + .unwrap(); + run(Args { + left_sequence: root.join("left.jsonl"), + right_sequence: root.join("right.jsonl"), + out_dir: root.join("out"), + align_window_runs: 0, + align_window_secs: 0, + sample_limit: 20, + warmup_samples: 0, + cooldown_samples: 0, + timeline_sample_limit: 0, + }) + .expect("run"); + let output: Value = serde_json::from_str( + &std::fs::read_to_string(root.join("out/sequence-triage.json")).unwrap(), + ) + .unwrap(); + assert_eq!(output["left"]["samples"][0]["seq"].as_u64(), Some(2)); + assert_eq!(output["left"]["samples"][1]["seq"].as_u64(), Some(1)); + assert_eq!( + sandwich_class_occurrences(&output, "PEER_MISSING_STABLE_OBJECT"), + 1 + ); + assert_eq!( + output["parameters"]["sequenceSemantics"]["timeSource"].as_str(), + Some("cir.validation_time") + ); + } + + #[test] + fn run_uses_adjacent_source_windows_only() { + let temp = tempfile::tempdir().expect("tempdir"); + let root = temp.path(); + let stable = object("rsync://example.net/pp/stable.roa", 0x11); + write_sample( + root, + "left1", + "left", + 1, + std::slice::from_ref(&stable), + &[], + 64496, + ); + write_sample(root, "left3", "left", 3, &[], &[], 64496); + write_sample( + root, + "left5", + "left", + 5, + std::slice::from_ref(&stable), + &[], + 64496, + ); + write_sample(root, "right4", "right", 4, &[], &[], 64497); + std::fs::write( + root.join("left.jsonl"), + jsonl(&[ + item("left", 1, "left1/result.ccr", "left1/result.cir"), + item("left", 3, "left3/result.ccr", "left3/result.cir"), + item("left", 5, "left5/result.ccr", "left5/result.cir"), + ]), + ) + .unwrap(); + std::fs::write( + root.join("right.jsonl"), + jsonl(&[item("right", 4, "right4/result.ccr", "right4/result.cir")]), + ) + .unwrap(); + run(Args { + left_sequence: root.join("left.jsonl"), + right_sequence: root.join("right.jsonl"), + out_dir: root.join("out"), + align_window_runs: 0, + align_window_secs: 0, + sample_limit: 20, + warmup_samples: 0, + cooldown_samples: 0, + timeline_sample_limit: 0, + }) + .expect("run"); + let output: Value = serde_json::from_str( + &std::fs::read_to_string(root.join("out/sequence-triage.json")).unwrap(), + ) + .unwrap(); + assert_eq!( + sandwich_class_occurrences(&output, "PEER_MISSING_STABLE_OBJECT"), + 0 + ); + } + + #[test] + fn run_checks_every_peer_sample_inside_adjacent_window() { + let temp = tempfile::tempdir().expect("tempdir"); + let root = temp.path(); + let stable = object("rsync://example.net/pp/stable.roa", 0x11); + write_sample( + root, + "left1", + "left", + 1, + std::slice::from_ref(&stable), + &[], + 64496, + ); + write_sample( + root, + "left5", + "left", + 5, + std::slice::from_ref(&stable), + &[], + 64496, + ); + write_sample(root, "right2", "right", 2, &[], &[], 64497); + write_sample(root, "right4", "right", 4, &[], &[], 64497); + write_sample(root, "right6", "right", 6, &[], &[], 64497); + std::fs::write( + root.join("left.jsonl"), + jsonl(&[ + item("left", 1, "left1/result.ccr", "left1/result.cir"), + item("left", 5, "left5/result.ccr", "left5/result.cir"), + ]), + ) + .unwrap(); + std::fs::write( + root.join("right.jsonl"), + jsonl(&[ + item("right", 2, "right2/result.ccr", "right2/result.cir"), + item("right", 4, "right4/result.ccr", "right4/result.cir"), + item("right", 6, "right6/result.ccr", "right6/result.cir"), + ]), + ) + .unwrap(); + run(Args { + left_sequence: root.join("left.jsonl"), + right_sequence: root.join("right.jsonl"), + out_dir: root.join("out"), + align_window_runs: 0, + align_window_secs: 0, + sample_limit: 20, + warmup_samples: 0, + cooldown_samples: 0, + timeline_sample_limit: 0, + }) + .expect("run"); + let output: Value = serde_json::from_str( + &std::fs::read_to_string(root.join("out/sequence-triage.json")).unwrap(), + ) + .unwrap(); + assert_eq!( + sandwich_class_occurrences(&output, "PEER_MISSING_STABLE_OBJECT"), + 2 + ); + assert_eq!(output["right"]["sampleCount"].as_u64(), Some(3)); + } + fn sandwich_class_occurrences(output: &Value, class: &str) -> u64 { output["sandwich"]["classificationCounts"] .as_array() diff --git a/src/tools/sequence_triage_ccr_cir/loader.rs b/src/tools/sequence_triage_ccr_cir/loader.rs index 909cda9..ef94552 100644 --- a/src/tools/sequence_triage_ccr_cir/loader.rs +++ b/src/tools/sequence_triage_ccr_cir/loader.rs @@ -59,12 +59,12 @@ pub(super) fn load_sequence(path: &Path, side: Side) -> Result Result= pair[1].raw.seq { - return Err("sequence must be sorted by increasing seq".into()); - } - } + samples.sort_by(|left, right| { + left.validation_time + .cmp(&right.validation_time) + .then_with(|| left.raw.seq.cmp(&right.raw.seq)) + .then_with(|| left.raw.run_id.cmp(&right.raw.run_id)) + }); if samples.iter().any(|sample| { sample .raw diff --git a/src/tools/sequence_triage_ccr_cir/output.rs b/src/tools/sequence_triage_ccr_cir/output.rs index a0ce02c..8b591bc 100644 --- a/src/tools/sequence_triage_ccr_cir/output.rs +++ b/src/tools/sequence_triage_ccr_cir/output.rs @@ -30,12 +30,18 @@ pub(super) fn build_output( "warmupSamples": args.warmup_samples, "cooldownSamples": args.cooldown_samples, "timelineSampleLimit": if args.timeline_sample_limit == 0 { args.sample_limit } else { args.timeline_sample_limit }, + "sequenceSemantics": { + "timeSource": "cir.validation_time", + "sourceSampleOrder": "each side is sorted by CIR validation_time, with seq/runId used only as stable tie breakers", + "sourceWindow": "only adjacent samples from the same side after CIR-time sorting form a source interval", + "peerWindow": "all peer samples with source_start.time < peer.time < source_end.time are checked independently", + }, }, "left": sequence_summary(left), "right": sequence_summary(right), "sandwich": { "strictTimeWindow": true, - "method": "For each side, use two adjacent source samples as a stable interval. If source_start.time < peer.time < source_end.time and the source value is identical at both interval endpoints, the peer sample is expected to contain the same value.", + "method": "For each side sorted by CIR validation_time, use two adjacent source samples as a stable interval. If source_start.time < peer.time < source_end.time and the source value is identical at both interval endpoints, every peer sample in the interval is expected to contain the same value.", "totals": { "occurrences": sandwich.total_occurrences, "uniqueKeys": sandwich.unique_keys.len(), @@ -63,6 +69,7 @@ pub(super) fn build_output( "intraRpChurnMeaning": "Adjacent-run churn measures how much each RP's own observed object/output/reject sets changed during the sequence. It is a baseline for judging whether cross-RP anomalies exceed natural sequence movement.", "limits": [ "Sequence triage only reads sequence JSONL plus referenced CCR/CIR files.", + "Sample time is always read from CIR validation_time; sequence JSONL validationTime is accepted only as optional metadata.", "It does not read report.json, logs, repo-bytes DB, cache, mirror, or raw objects for root cause proof.", "Sandwich anomalies indicate stable-interval contradictions, not final root cause. Manual evidence is still required for network, repository, or implementation attribution." ]