20260601 sequence triage支持独立时间序列

This commit is contained in:
yuyr 2026-06-01 23:41:17 +08:00
parent 902f3ba889
commit 9579f65501
4 changed files with 304 additions and 27 deletions

View File

@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
import argparse import argparse
from concurrent.futures import ThreadPoolExecutor, as_completed
import hashlib import hashlib
import json import json
import os import os
@ -56,15 +57,20 @@ def rsync_from_remote(target: str, source: str | Path, destination: Path) -> Non
def rsync_run_artifacts_from_remote(target: str, source: str | Path, destination: Path) -> None: def rsync_run_artifacts_from_remote(target: str, source: str | Path, destination: Path) -> None:
destination.mkdir(parents=True, exist_ok=True) destination.mkdir(parents=True, exist_ok=True)
rsync_base = ["rsync", "-az", "--partial", "--partial-dir=.rsync-partial"] rsync_base = ["rsync", "-az", "--ignore-missing-args", "--partial", "--partial-dir=.rsync-partial"]
for name in [ for name in [
"result.ccr", "result.ccr",
"result.cir", "result.cir",
"report.json",
"vrps.csv",
"vaps.csv",
"process-time.txt", "process-time.txt",
"remote-run-meta.json", "remote-run-meta.json",
"exit-code.txt", "exit-code.txt",
"started-at.txt", "started-at.txt",
"finished-at.txt", "finished-at.txt",
"stdout.log",
"stderr.log",
]: ]:
run_local([*rsync_base, f"{target}:{source}/{name}", f"{destination}/"]) run_local([*rsync_base, f"{target}:{source}/{name}", f"{destination}/"])
@ -260,6 +266,26 @@ def side_config(name: str) -> dict[str, Any]:
raise SystemExit(f"unknown side config: {name}") raise SystemExit(f"unknown side config: {name}")
def parse_rirs(raw: str) -> list[str]:
rirs = [item.strip() for item in raw.split(",") if item.strip()]
if not rirs:
raise SystemExit("--rirs must contain at least one RIR")
seen: set[str] = set()
invalid: list[str] = []
duplicate: list[str] = []
for rir in rirs:
if rir not in DEFAULT_RIRS:
invalid.append(rir)
if rir in seen:
duplicate.append(rir)
seen.add(rir)
if invalid:
raise SystemExit(f"unsupported RIR(s): {','.join(invalid)}; valid values: {','.join(DEFAULT_RIRS)}")
if duplicate:
raise SystemExit(f"duplicate RIR(s): {','.join(duplicate)}")
return rirs
def build_remote_command(remote_root: Path, side_name: str, side: dict[str, Any], side_label: str, seq: int, rirs: list[str]) -> tuple[Path, str]: def build_remote_command(remote_root: Path, side_name: str, side: dict[str, Any], side_label: str, seq: int, rirs: list[str]) -> tuple[Path, str]:
run_dir = remote_root / "experiments" / "sequence" / side_label / f"run_{seq:04d}" run_dir = remote_root / "experiments" / "sequence" / side_label / f"run_{seq:04d}"
state_dir = remote_root / "experiments" / "sequence" / side_label / "state" / side["rpKind"] state_dir = remote_root / "experiments" / "sequence" / side_label / "state" / side["rpKind"]
@ -429,12 +455,57 @@ def run_sequence_triage(local_exp_root: Path, args: argparse.Namespace) -> None:
]) ])
def run_side_sequence(
args: argparse.Namespace,
remote_root: Path,
local_exp_root: Path,
side_label: str,
side_name: str,
side: dict[str, Any],
seq_path: Path,
rirs: list[str],
) -> list[dict[str, Any]]:
side_progress: list[dict[str, Any]] = []
for seq in range(1, args.samples_per_side + 1):
side_progress.append(
run_one_side_sample(args, remote_root, local_exp_root, side_label, side_name, side, seq_path, seq, rirs)
)
return side_progress
def run_one_side_sample(
args: argparse.Namespace,
remote_root: Path,
local_exp_root: Path,
side_label: str,
side_name: str,
side: dict[str, Any],
seq_path: Path,
seq: int,
rirs: list[str],
) -> dict[str, Any]:
rir_label = ",".join(rirs)
print(
f"[run] {side_label} {side_name} seq={seq} rirs={rir_label} schedule={args.schedule_mode}",
flush=True,
)
remote_run_dir = run_remote_sample(args.ssh_target, remote_root, side_name, side, side_label, seq, rirs)
local_run_dir = local_exp_root / side_label / f"run_{seq:04d}"
rsync_run_artifacts_from_remote(args.ssh_target, remote_run_dir, local_run_dir)
item = build_sequence_item(local_exp_root, side_name, side_label, side, seq, local_run_dir)
item["scheduleMode"] = args.schedule_mode
append_jsonl(seq_path, item)
print(
f"[done] {side_label} seq={seq} wallMs={item.get('wallMs')} vrps={item.get('vrps')} vaps={item.get('vaps')} objects={item.get('cirObjectCount')} rejects={item.get('cirRejectCount')}",
flush=True,
)
return item
def run_experiment(args: argparse.Namespace) -> None: def run_experiment(args: argparse.Namespace) -> None:
if not args.skip_build: if not args.skip_build:
build_tool_binaries() build_tool_binaries()
rirs = [item.strip() for item in args.rirs.split(",") if item.strip()] rirs = parse_rirs(args.rirs)
if set(rirs) != set(DEFAULT_RIRS) or len(rirs) != len(DEFAULT_RIRS):
raise SystemExit("#043 remote acceptance requires all5 RIRs: afrinic,apnic,arin,lacnic,ripe")
left = side_config(args.left) left = side_config(args.left)
right = side_config(args.right) right = side_config(args.right)
run_root = Path(args.run_root).resolve() run_root = Path(args.run_root).resolve()
@ -447,6 +518,7 @@ def run_experiment(args: argparse.Namespace) -> None:
"right": args.right, "right": args.right,
"samplesPerSide": args.samples_per_side, "samplesPerSide": args.samples_per_side,
"rirs": rirs, "rirs": rirs,
"scheduleMode": args.schedule_mode,
"remoteRoot": str(remote_root), "remoteRoot": str(remote_root),
"sshTarget": args.ssh_target, "sshTarget": args.ssh_target,
}) })
@ -467,19 +539,24 @@ def run_experiment(args: argparse.Namespace) -> None:
left_seq_path.unlink(missing_ok=True) left_seq_path.unlink(missing_ok=True)
right_seq_path.unlink(missing_ok=True) right_seq_path.unlink(missing_ok=True)
progress: list[dict[str, Any]] = [] progress: list[dict[str, Any]] = []
if args.schedule_mode == "interleaved":
for seq in range(1, args.samples_per_side + 1): for seq in range(1, args.samples_per_side + 1):
for side_label, side_name, side, seq_path in [("A", args.left, left, left_seq_path), ("B", args.right, right, right_seq_path)]: for side_label, side_name, side, seq_path in [
print(f"[run] {side_label} {side_name} seq={seq} all5", flush=True) ("A", args.left, left, left_seq_path),
remote_run_dir = run_remote_sample(args.ssh_target, remote_root, side_name, side, side_label, seq, rirs) ("B", args.right, right, right_seq_path),
local_run_dir = local_exp_root / side_label / f"run_{seq:04d}" ]:
rsync_run_artifacts_from_remote(args.ssh_target, remote_run_dir, local_run_dir) progress.append(
item = build_sequence_item(local_exp_root, side_name, side_label, side, seq, local_run_dir) run_one_side_sample(args, remote_root, local_exp_root, side_label, side_name, side, seq_path, seq, rirs)
append_jsonl(seq_path, item)
progress.append(item)
print(
f"[done] {side_label} seq={seq} wallMs={item.get('wallMs')} vrps={item.get('vrps')} vaps={item.get('vaps')} objects={item.get('cirObjectCount')} rejects={item.get('cirRejectCount')}",
flush=True,
) )
else:
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [
executor.submit(run_side_sequence, args, remote_root, local_exp_root, "A", args.left, left, left_seq_path, rirs),
executor.submit(run_side_sequence, args, remote_root, local_exp_root, "B", args.right, right, right_seq_path, rirs),
]
for future in as_completed(futures):
progress.extend(future.result())
progress.sort(key=lambda item: (str(item.get("side")), int(item.get("seq") or 0)))
write_json(local_exp_root / "run-progress.json", progress) write_json(local_exp_root / "run-progress.json", progress)
run_sequence_triage(local_exp_root, args) run_sequence_triage(local_exp_root, args)
ssh_script(args.ssh_target, f"df -h /data / > {shlex.quote(str(remote_root / 'df-after.txt'))} 2>&1 || true; free -h > {shlex.quote(str(remote_root / 'free-after.txt'))} 2>&1 || true") ssh_script(args.ssh_target, f"df -h /data / > {shlex.quote(str(remote_root / 'df-after.txt'))} 2>&1 || true; free -h > {shlex.quote(str(remote_root / 'free-after.txt'))} 2>&1 || true")
@ -496,6 +573,7 @@ def main() -> None:
parser.add_argument("--right", default="rpki-client-standard") parser.add_argument("--right", default="rpki-client-standard")
parser.add_argument("--samples-per-side", type=int, default=3) parser.add_argument("--samples-per-side", type=int, default=3)
parser.add_argument("--rirs", default=",".join(DEFAULT_RIRS)) parser.add_argument("--rirs", default=",".join(DEFAULT_RIRS))
parser.add_argument("--schedule-mode", choices=["interleaved", "parallel"], default="interleaved")
parser.add_argument("--align-window-runs", type=int, default=2) parser.add_argument("--align-window-runs", type=int, default=2)
parser.add_argument("--align-window-secs", type=int, default=1800) parser.add_argument("--align-window-secs", type=int, default=1800)
parser.add_argument("--sample-limit", type=int, default=200) parser.add_argument("--sample-limit", type=int, default=200)

View File

@ -547,6 +547,198 @@ mod tests {
assert!(output.get("adjusted").is_none()); assert!(output.get("adjusted").is_none());
} }
#[test]
fn run_uses_cir_validation_time_and_sorts_by_time_not_raw_seq() {
let temp = tempfile::tempdir().expect("tempdir");
let root = temp.path();
let stable = object("rsync://example.net/pp/stable.roa", 0x11);
write_sample(
root,
"left5",
"left",
5,
std::slice::from_ref(&stable),
&[],
64496,
);
write_sample(
root,
"left1",
"left",
1,
std::slice::from_ref(&stable),
&[],
64496,
);
write_sample(root, "right3", "right", 3, &[], &[], 64497);
std::fs::write(
root.join("left.jsonl"),
jsonl(&[
item("left", 1, "left5/result.ccr", "left5/result.cir"),
item("left", 2, "left1/result.ccr", "left1/result.cir"),
]),
)
.unwrap();
std::fs::write(
root.join("right.jsonl"),
jsonl(&[item("right", 10, "right3/result.ccr", "right3/result.cir")]),
)
.unwrap();
run(Args {
left_sequence: root.join("left.jsonl"),
right_sequence: root.join("right.jsonl"),
out_dir: root.join("out"),
align_window_runs: 0,
align_window_secs: 0,
sample_limit: 20,
warmup_samples: 0,
cooldown_samples: 0,
timeline_sample_limit: 0,
})
.expect("run");
let output: Value = serde_json::from_str(
&std::fs::read_to_string(root.join("out/sequence-triage.json")).unwrap(),
)
.unwrap();
assert_eq!(output["left"]["samples"][0]["seq"].as_u64(), Some(2));
assert_eq!(output["left"]["samples"][1]["seq"].as_u64(), Some(1));
assert_eq!(
sandwich_class_occurrences(&output, "PEER_MISSING_STABLE_OBJECT"),
1
);
assert_eq!(
output["parameters"]["sequenceSemantics"]["timeSource"].as_str(),
Some("cir.validation_time")
);
}
#[test]
fn run_uses_adjacent_source_windows_only() {
let temp = tempfile::tempdir().expect("tempdir");
let root = temp.path();
let stable = object("rsync://example.net/pp/stable.roa", 0x11);
write_sample(
root,
"left1",
"left",
1,
std::slice::from_ref(&stable),
&[],
64496,
);
write_sample(root, "left3", "left", 3, &[], &[], 64496);
write_sample(
root,
"left5",
"left",
5,
std::slice::from_ref(&stable),
&[],
64496,
);
write_sample(root, "right4", "right", 4, &[], &[], 64497);
std::fs::write(
root.join("left.jsonl"),
jsonl(&[
item("left", 1, "left1/result.ccr", "left1/result.cir"),
item("left", 3, "left3/result.ccr", "left3/result.cir"),
item("left", 5, "left5/result.ccr", "left5/result.cir"),
]),
)
.unwrap();
std::fs::write(
root.join("right.jsonl"),
jsonl(&[item("right", 4, "right4/result.ccr", "right4/result.cir")]),
)
.unwrap();
run(Args {
left_sequence: root.join("left.jsonl"),
right_sequence: root.join("right.jsonl"),
out_dir: root.join("out"),
align_window_runs: 0,
align_window_secs: 0,
sample_limit: 20,
warmup_samples: 0,
cooldown_samples: 0,
timeline_sample_limit: 0,
})
.expect("run");
let output: Value = serde_json::from_str(
&std::fs::read_to_string(root.join("out/sequence-triage.json")).unwrap(),
)
.unwrap();
assert_eq!(
sandwich_class_occurrences(&output, "PEER_MISSING_STABLE_OBJECT"),
0
);
}
#[test]
fn run_checks_every_peer_sample_inside_adjacent_window() {
let temp = tempfile::tempdir().expect("tempdir");
let root = temp.path();
let stable = object("rsync://example.net/pp/stable.roa", 0x11);
write_sample(
root,
"left1",
"left",
1,
std::slice::from_ref(&stable),
&[],
64496,
);
write_sample(
root,
"left5",
"left",
5,
std::slice::from_ref(&stable),
&[],
64496,
);
write_sample(root, "right2", "right", 2, &[], &[], 64497);
write_sample(root, "right4", "right", 4, &[], &[], 64497);
write_sample(root, "right6", "right", 6, &[], &[], 64497);
std::fs::write(
root.join("left.jsonl"),
jsonl(&[
item("left", 1, "left1/result.ccr", "left1/result.cir"),
item("left", 5, "left5/result.ccr", "left5/result.cir"),
]),
)
.unwrap();
std::fs::write(
root.join("right.jsonl"),
jsonl(&[
item("right", 2, "right2/result.ccr", "right2/result.cir"),
item("right", 4, "right4/result.ccr", "right4/result.cir"),
item("right", 6, "right6/result.ccr", "right6/result.cir"),
]),
)
.unwrap();
run(Args {
left_sequence: root.join("left.jsonl"),
right_sequence: root.join("right.jsonl"),
out_dir: root.join("out"),
align_window_runs: 0,
align_window_secs: 0,
sample_limit: 20,
warmup_samples: 0,
cooldown_samples: 0,
timeline_sample_limit: 0,
})
.expect("run");
let output: Value = serde_json::from_str(
&std::fs::read_to_string(root.join("out/sequence-triage.json")).unwrap(),
)
.unwrap();
assert_eq!(
sandwich_class_occurrences(&output, "PEER_MISSING_STABLE_OBJECT"),
2
);
assert_eq!(output["right"]["sampleCount"].as_u64(), Some(3));
}
fn sandwich_class_occurrences(output: &Value, class: &str) -> u64 { fn sandwich_class_occurrences(output: &Value, class: &str) -> u64 {
output["sandwich"]["classificationCounts"] output["sandwich"]["classificationCounts"]
.as_array() .as_array()

View File

@ -59,12 +59,12 @@ pub(super) fn load_sequence(path: &Path, side: Side) -> Result<Vec<SequenceSampl
ccr_path.display() ccr_path.display()
) )
})?; })?;
let validation_time = raw let _sequence_validation_time = raw
.validation_time .validation_time
.as_deref() .as_deref()
.map(parse_rfc3339) .map(parse_rfc3339)
.transpose()? .transpose()?;
.unwrap_or(cir.validation_time); let validation_time = cir.validation_time;
let objects = cir let objects = cir
.objects .objects
.iter() .iter()
@ -122,12 +122,12 @@ pub(super) fn load_sequence(path: &Path, side: Side) -> Result<Vec<SequenceSampl
vaps, vaps,
}); });
} }
samples.sort_by_key(|sample| sample.raw.seq); samples.sort_by(|left, right| {
for pair in samples.windows(2) { left.validation_time
if pair[0].raw.seq >= pair[1].raw.seq { .cmp(&right.validation_time)
return Err("sequence must be sorted by increasing seq".into()); .then_with(|| left.raw.seq.cmp(&right.raw.seq))
} .then_with(|| left.raw.run_id.cmp(&right.raw.run_id))
} });
if samples.iter().any(|sample| { if samples.iter().any(|sample| {
sample sample
.raw .raw

View File

@ -30,12 +30,18 @@ pub(super) fn build_output(
"warmupSamples": args.warmup_samples, "warmupSamples": args.warmup_samples,
"cooldownSamples": args.cooldown_samples, "cooldownSamples": args.cooldown_samples,
"timelineSampleLimit": if args.timeline_sample_limit == 0 { args.sample_limit } else { args.timeline_sample_limit }, "timelineSampleLimit": if args.timeline_sample_limit == 0 { args.sample_limit } else { args.timeline_sample_limit },
"sequenceSemantics": {
"timeSource": "cir.validation_time",
"sourceSampleOrder": "each side is sorted by CIR validation_time, with seq/runId used only as stable tie breakers",
"sourceWindow": "only adjacent samples from the same side after CIR-time sorting form a source interval",
"peerWindow": "all peer samples with source_start.time < peer.time < source_end.time are checked independently",
},
}, },
"left": sequence_summary(left), "left": sequence_summary(left),
"right": sequence_summary(right), "right": sequence_summary(right),
"sandwich": { "sandwich": {
"strictTimeWindow": true, "strictTimeWindow": true,
"method": "For each side, use two adjacent source samples as a stable interval. If source_start.time < peer.time < source_end.time and the source value is identical at both interval endpoints, the peer sample is expected to contain the same value.", "method": "For each side sorted by CIR validation_time, use two adjacent source samples as a stable interval. If source_start.time < peer.time < source_end.time and the source value is identical at both interval endpoints, every peer sample in the interval is expected to contain the same value.",
"totals": { "totals": {
"occurrences": sandwich.total_occurrences, "occurrences": sandwich.total_occurrences,
"uniqueKeys": sandwich.unique_keys.len(), "uniqueKeys": sandwich.unique_keys.len(),
@ -63,6 +69,7 @@ pub(super) fn build_output(
"intraRpChurnMeaning": "Adjacent-run churn measures how much each RP's own observed object/output/reject sets changed during the sequence. It is a baseline for judging whether cross-RP anomalies exceed natural sequence movement.", "intraRpChurnMeaning": "Adjacent-run churn measures how much each RP's own observed object/output/reject sets changed during the sequence. It is a baseline for judging whether cross-RP anomalies exceed natural sequence movement.",
"limits": [ "limits": [
"Sequence triage only reads sequence JSONL plus referenced CCR/CIR files.", "Sequence triage only reads sequence JSONL plus referenced CCR/CIR files.",
"Sample time is always read from CIR validation_time; sequence JSONL validationTime is accepted only as optional metadata.",
"It does not read report.json, logs, repo-bytes DB, cache, mirror, or raw objects for root cause proof.", "It does not read report.json, logs, repo-bytes DB, cache, mirror, or raw objects for root cause proof.",
"Sandwich anomalies indicate stable-interval contradictions, not final root cause. Manual evidence is still required for network, repository, or implementation attribution." "Sandwich anomalies indicate stable-interval contradictions, not final root cause. Manual evidence is still required for network, repository, or implementation attribution."
] ]