From 00d710950367519286cd2d1acbb3f65689239d31 Mon Sep 17 00:00:00 2001 From: yuyr Date: Mon, 1 Jun 2026 11:01:42 +0800 Subject: [PATCH] =?UTF-8?q?20260531=20=E6=8B=86=E5=88=86sequence=20triage?= =?UTF-8?q?=E5=B7=A5=E5=85=B7=E5=B9=B6=E4=BF=AE=E5=A4=8D=E8=BF=9C=E7=AB=AF?= =?UTF-8?q?=E4=BA=A7=E7=89=A9=E6=8B=89=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../run_sequence_triage_experiment.py | 21 +- src/tools/sequence_triage_ccr_cir.rs | 1980 +---------------- src/tools/sequence_triage_ccr_cir/adjusted.rs | 503 +++++ src/tools/sequence_triage_ccr_cir/analysis.rs | 364 +++ src/tools/sequence_triage_ccr_cir/args.rs | 119 + src/tools/sequence_triage_ccr_cir/io.rs | 33 + src/tools/sequence_triage_ccr_cir/loader.rs | 144 ++ src/tools/sequence_triage_ccr_cir/model.rs | 173 ++ src/tools/sequence_triage_ccr_cir/output.rs | 408 ++++ src/tools/sequence_triage_ccr_cir/sandwich.rs | 261 +++ 10 files changed, 2034 insertions(+), 1972 deletions(-) create mode 100644 src/tools/sequence_triage_ccr_cir/adjusted.rs create mode 100644 src/tools/sequence_triage_ccr_cir/analysis.rs create mode 100644 src/tools/sequence_triage_ccr_cir/args.rs create mode 100644 src/tools/sequence_triage_ccr_cir/io.rs create mode 100644 src/tools/sequence_triage_ccr_cir/loader.rs create mode 100644 src/tools/sequence_triage_ccr_cir/model.rs create mode 100644 src/tools/sequence_triage_ccr_cir/output.rs create mode 100644 src/tools/sequence_triage_ccr_cir/sandwich.rs diff --git a/scripts/experiments/feature043/run_sequence_triage_experiment.py b/scripts/experiments/feature043/run_sequence_triage_experiment.py index c582325..9a6359f 100755 --- a/scripts/experiments/feature043/run_sequence_triage_experiment.py +++ b/scripts/experiments/feature043/run_sequence_triage_experiment.py @@ -56,17 +56,16 @@ def rsync_from_remote(target: str, source: str | Path, destination: Path) -> Non def rsync_run_artifacts_from_remote(target: str, source: str | Path, destination: Path) -> None: destination.mkdir(parents=True, exist_ok=True) - include_args = [ - "--include", "result.ccr", - "--include", "result.cir", - "--include", "process-time.txt", - "--include", "remote-run-meta.json", - "--include", "exit-code.txt", - "--include", "started-at.txt", - "--include", "finished-at.txt", - "--exclude", "*", - ] - run_local(["rsync", "-a", *include_args, f"{target}:{source}/", f"{destination}/"]) + for name in [ + "result.ccr", + "result.cir", + "process-time.txt", + "remote-run-meta.json", + "exit-code.txt", + "started-at.txt", + "finished-at.txt", + ]: + run_local(["rsync", "-a", f"{target}:{source}/{name}", f"{destination}/"]) def load_json(path: Path) -> Any: diff --git a/src/tools/sequence_triage_ccr_cir.rs b/src/tools/sequence_triage_ccr_cir.rs index 8de9adf..ea5ce11 100644 --- a/src/tools/sequence_triage_ccr_cir.rs +++ b/src/tools/sequence_triage_ccr_cir.rs @@ -1,196 +1,19 @@ -use std::collections::{BTreeMap, BTreeSet}; -use std::path::{Path, PathBuf}; +mod adjusted; +mod analysis; +mod args; +mod io; +mod loader; +mod model; +mod output; +mod sandwich; -use crate::ccr::{decode_ccr_compare_views, decode_content_info}; -use crate::cir::decode_cir; -use serde::Deserialize; -use serde_json::{Value, json}; -use time::OffsetDateTime; -use time::format_description::well_known::Rfc3339; - -#[derive(Debug, PartialEq, Eq)] -struct Args { - left_sequence: PathBuf, - right_sequence: PathBuf, - out_dir: PathBuf, - align_window_runs: u32, - align_window_secs: i64, - sample_limit: usize, - warmup_samples: usize, - cooldown_samples: usize, - timeline_sample_limit: usize, -} - -#[derive(Clone, Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -struct SequenceItemRaw { - schema_version: Option, - rp_id: String, - side: Option, - seq: u32, - run_id: String, - sync_mode: Option, - status: Option, - start_time: Option, - finish_time: Option, - validation_time: Option, - ccr_path: PathBuf, - cir_path: PathBuf, - ccr_sha256: Option, - cir_sha256: Option, - wall_ms: Option, - max_rss_kb: Option, - vrps: Option, - vaps: Option, -} - -#[derive(Clone, Debug)] -struct SequenceSample { - raw: SequenceItemRaw, - validation_time: OffsetDateTime, - ccr_path: PathBuf, - cir_path: PathBuf, - objects: BTreeMap, - object_uris: BTreeSet, - object_hashes: BTreeSet, - rejects: BTreeSet, - trust_anchors: BTreeSet, - vrps: BTreeSet, - vaps: BTreeSet, -} - -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -enum Side { - Left, - Right, -} - -impl Side { - fn as_str(self) -> &'static str { - match self { - Side::Left => "left", - Side::Right => "right", - } - } -} - -#[derive(Clone, Debug)] -struct EventOccurrence { - side: Side, - seq: u32, - run_id: String, -} - -#[derive(Clone, Debug)] -struct SampleRecord { - classification: &'static str, - event_type: &'static str, - key: String, - source_side: Side, - source_seq: u32, - source_run_id: String, - matched_seq: Option, - matched_run_id: Option, - note: String, -} - -#[derive(Clone, Debug, Default)] -struct ClassStats { - total: usize, - samples: Vec, -} - -#[derive(Clone, Debug, Default)] -struct AnalysisResult { - stats: BTreeMap<&'static str, ClassStats>, -} - -#[derive(Clone, Debug)] -struct DiffEvent { - event_type: &'static str, - raw_class: &'static str, - key: String, - source_side: Side, - source_seq: u32, - source_run_id: String, -} - -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -enum EdgePosition { - Leading, - Stable, - Trailing, -} - -#[derive(Clone, Debug)] -struct AdjustedRecord { - classification: &'static str, - event_type: &'static str, - key: String, - source_side: Side, - source_seq: u32, - source_run_id: String, - note: String, -} - -#[derive(Clone, Debug, Default)] -struct AdjustedClassStats { - total: usize, - unique_keys: BTreeSet, - samples: Vec, -} - -#[derive(Clone, Debug, Default)] -struct AdjustedAnalysis { - raw_persistent_occurrences: usize, - raw_persistent_unique_keys: usize, - edge_filtered_occurrences: usize, - edge_filtered_unique_keys: usize, - adjusted_stable_occurrences: usize, - adjusted_stable_unique_keys: usize, - stats: BTreeMap<&'static str, AdjustedClassStats>, - uri_timeline_samples: Vec, - stable_object_groups: Vec, -} - -#[derive(Clone, Debug)] -struct SandwichRecord { - classification: &'static str, - set_type: &'static str, - key: String, - source_side: Side, - source_start_seq: u32, - source_start_run_id: String, - source_end_seq: u32, - source_end_run_id: String, - peer_seq: u32, - peer_run_id: String, - source_value: Option, - peer_value: Option, - source_start_time: String, - peer_time: String, - source_end_time: String, - note: String, -} - -#[derive(Clone, Debug, Default)] -struct SandwichClassStats { - total: usize, - unique_keys: BTreeSet, - samples: Vec, -} - -#[derive(Clone, Debug, Default)] -struct SandwichAnalysis { - total_occurrences: usize, - unique_keys: BTreeSet, - by_set_type: BTreeMap<&'static str, usize>, - stats: BTreeMap<&'static str, SandwichClassStats>, -} - -fn usage() -> &'static str { - "Usage: sequence_triage_ccr_cir --left-sequence --right-sequence --out-dir [--align-window-runs ] [--align-window-secs ] [--sample-limit ] [--warmup-samples ] [--cooldown-samples ] [--timeline-sample-limit ]" -} +use adjusted::build_adjusted_analysis; +use analysis::{analyze_hash_rollover, analyze_set, collect_persistent_events}; +use args::{Args, parse_args}; +use loader::load_sequence; +use model::{AnalysisResult, Side}; +use output::{build_output, write_json, write_markdown, write_samples_jsonl}; +use sandwich::build_sandwich_analysis; pub fn main_entry() -> Result<(), String> { real_main() @@ -201,107 +24,6 @@ fn real_main() -> Result<(), String> { run(args) } -fn parse_args(argv: &[String]) -> Result { - let mut left_sequence = None; - let mut right_sequence = None; - let mut out_dir = None; - let mut align_window_runs = 2u32; - let mut align_window_secs = 1800i64; - let mut sample_limit = 200usize; - let mut warmup_samples = 1usize; - let mut cooldown_samples = 1usize; - let mut timeline_sample_limit = 0usize; - let mut index = 1usize; - while index < argv.len() { - match argv[index].as_str() { - "--left-sequence" => { - index += 1; - left_sequence = Some(PathBuf::from( - argv.get(index).ok_or("--left-sequence requires a value")?, - )); - } - "--right-sequence" => { - index += 1; - right_sequence = Some(PathBuf::from( - argv.get(index).ok_or("--right-sequence requires a value")?, - )); - } - "--out-dir" => { - index += 1; - out_dir = Some(PathBuf::from( - argv.get(index).ok_or("--out-dir requires a value")?, - )); - } - "--align-window-runs" => { - index += 1; - let value = argv - .get(index) - .ok_or("--align-window-runs requires a value")?; - align_window_runs = value - .parse::() - .map_err(|_| format!("invalid --align-window-runs: {value}"))?; - } - "--align-window-secs" => { - index += 1; - let value = argv - .get(index) - .ok_or("--align-window-secs requires a value")?; - align_window_secs = value - .parse::() - .map_err(|_| format!("invalid --align-window-secs: {value}"))?; - } - "--sample-limit" => { - index += 1; - let value = argv.get(index).ok_or("--sample-limit requires a value")?; - sample_limit = value - .parse::() - .map_err(|_| format!("invalid --sample-limit: {value}"))?; - } - "--warmup-samples" => { - index += 1; - let value = argv.get(index).ok_or("--warmup-samples requires a value")?; - warmup_samples = value - .parse::() - .map_err(|_| format!("invalid --warmup-samples: {value}"))?; - } - "--cooldown-samples" => { - index += 1; - let value = argv - .get(index) - .ok_or("--cooldown-samples requires a value")?; - cooldown_samples = value - .parse::() - .map_err(|_| format!("invalid --cooldown-samples: {value}"))?; - } - "--timeline-sample-limit" => { - index += 1; - let value = argv - .get(index) - .ok_or("--timeline-sample-limit requires a value")?; - timeline_sample_limit = value - .parse::() - .map_err(|_| format!("invalid --timeline-sample-limit: {value}"))?; - } - "-h" | "--help" => return Err(usage().to_string()), - other => return Err(format!("unknown argument: {other}\n{}", usage())), - } - index += 1; - } - Ok(Args { - left_sequence: left_sequence - .ok_or_else(|| format!("--left-sequence is required\n{}", usage()))?, - right_sequence: right_sequence - .ok_or_else(|| format!("--right-sequence is required\n{}", usage()))?, - out_dir: out_dir.ok_or_else(|| format!("--out-dir is required\n{}", usage()))?, - align_window_runs, - align_window_secs, - sample_limit, - warmup_samples, - cooldown_samples, - timeline_sample_limit, - }) -} - fn run(args: Args) -> Result<(), String> { std::fs::create_dir_all(&args.out_dir) .map_err(|e| format!("create out-dir failed: {}: {e}", args.out_dir.display()))?; @@ -385,1674 +107,6 @@ fn run(args: Args) -> Result<(), String> { Ok(()) } -fn load_sequence(path: &Path, side: Side) -> Result, String> { - let base_dir = path.parent().unwrap_or_else(|| Path::new(".")); - let text = std::fs::read_to_string(path) - .map_err(|e| format!("read sequence failed: {}: {e}", path.display()))?; - let mut samples = Vec::new(); - let mut seen_seq = BTreeSet::new(); - for (line_index, line) in text.lines().enumerate() { - let line = line.trim(); - if line.is_empty() { - continue; - } - let raw: SequenceItemRaw = serde_json::from_str(line).map_err(|e| { - format!( - "parse sequence JSONL failed: {}:{}: {e}", - path.display(), - line_index + 1 - ) - })?; - if raw.schema_version.unwrap_or(1) != 1 { - return Err(format!( - "unsupported sequence item schemaVersion in {}:{}", - path.display(), - line_index + 1 - )); - } - if !seen_seq.insert(raw.seq) { - return Err(format!("duplicate seq {} in {}", raw.seq, path.display())); - } - if let Some(status) = &raw.status - && status != "success" - { - return Err(format!( - "sequence sample {} has non-success status: {status}", - raw.run_id - )); - } - let cir_path = resolve_path(base_dir, &raw.cir_path); - let ccr_path = resolve_path(base_dir, &raw.ccr_path); - let cir = decode_cir(&read_file(&cir_path)?).map_err(|e| { - format!( - "decode CIR failed for sample {} ({}): {e}", - raw.run_id, - cir_path.display() - ) - })?; - let ccr = decode_content_info(&read_file(&ccr_path)?).map_err(|e| { - format!( - "decode CCR failed for sample {} ({}): {e}", - raw.run_id, - ccr_path.display() - ) - })?; - let validation_time = raw - .validation_time - .as_deref() - .map(parse_rfc3339) - .transpose()? - .unwrap_or(cir.validation_time); - let objects = cir - .objects - .iter() - .map(|item| (item.rsync_uri.clone(), hex::encode(&item.sha256))) - .collect::>(); - let object_uris = objects.keys().cloned().collect::>(); - let object_hashes = objects - .iter() - .map(|(uri, hash)| object_hash_key(uri, hash)) - .collect::>(); - let rejects = cir - .rejected_objects - .iter() - .map(|item| item.object_uri.clone()) - .collect::>(); - let trust_anchors = cir - .trust_anchors - .iter() - .map(|item| { - format!( - "{}|{}|{}|{}", - item.ta_rsync_uri, - item.tal_uri, - hex::encode(crate::cir::sha256(&item.tal_bytes)), - hex::encode(&item.ta_certificate_sha256) - ) - }) - .collect::>(); - let (vrps, vaps) = decode_ccr_compare_views(&ccr).map_err(|e| { - format!( - "decode CCR compare views failed for sample {} ({}): {e}", - raw.run_id, - ccr_path.display() - ) - })?; - let vrps = vrps - .into_iter() - .map(|row| format!("{}|{}|{}", row.asn, row.ip_prefix, row.max_length)) - .collect::>(); - let vaps = vaps - .into_iter() - .map(|row| format!("{}|{}", row.customer_asn, row.providers)) - .collect::>(); - samples.push(SequenceSample { - raw, - validation_time, - ccr_path, - cir_path, - objects, - object_uris, - object_hashes, - rejects, - trust_anchors, - vrps, - vaps, - }); - } - samples.sort_by_key(|sample| sample.raw.seq); - for pair in samples.windows(2) { - if pair[0].raw.seq >= pair[1].raw.seq { - return Err("sequence must be sorted by increasing seq".into()); - } - } - if samples.iter().any(|sample| { - sample - .raw - .side - .as_deref() - .is_some_and(|item| item != side.as_str()) - }) { - return Err(format!( - "sequence side field does not match expected side: {}", - side.as_str() - )); - } - Ok(samples) -} - -fn analyze_set( - result: &mut AnalysisResult, - event_type: &'static str, - left: &[SequenceSample], - right: &[SequenceSample], - extract: F, - resolved_class: &'static str, - persistent_class: &'static str, - args: &Args, -) where - F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, -{ - analyze_direction( - result, - event_type, - Side::Left, - left, - right, - &extract, - resolved_class, - persistent_class, - args, - ); - analyze_direction( - result, - event_type, - Side::Right, - right, - left, - &extract, - resolved_class, - persistent_class, - args, - ); -} - -fn analyze_direction( - result: &mut AnalysisResult, - event_type: &'static str, - source_side: Side, - source: &[SequenceSample], - peer: &[SequenceSample], - extract: &F, - resolved_class: &'static str, - persistent_class: &'static str, - args: &Args, -) where - F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, -{ - for sample in source { - let source_set = extract(sample); - for key in source_set { - if peer_sample_at_seq(peer, sample.raw.seq) - .is_some_and(|peer_sample| extract(peer_sample).contains(key)) - { - continue; - } - if let Some(matched) = find_future_match(peer, sample, &key, extract, args) { - result.add( - resolved_class, - SampleRecord { - classification: resolved_class, - event_type, - key: key.clone(), - source_side, - source_seq: sample.raw.seq, - source_run_id: sample.raw.run_id.clone(), - matched_seq: Some(matched.seq), - matched_run_id: Some(matched.run_id), - note: format!( - "matched in {} sequence within alignment window", - matched.side.as_str() - ), - }, - args.sample_limit, - ); - } else { - result.add( - persistent_class, - SampleRecord { - classification: persistent_class, - event_type, - key: key.clone(), - source_side, - source_seq: sample.raw.seq, - source_run_id: sample.raw.run_id.clone(), - matched_seq: None, - matched_run_id: None, - note: "no matching event in peer sequence alignment window".to_string(), - }, - args.sample_limit, - ); - } - } - } -} - -fn analyze_hash_rollover( - result: &mut AnalysisResult, - left: &[SequenceSample], - right: &[SequenceSample], - args: &Args, -) { - for (source_side, source, peer) in [(Side::Left, left, right), (Side::Right, right, left)] { - for sample in source { - for (uri, hash) in &sample.objects { - if peer_sample_at_seq(peer, sample.raw.seq) - .and_then(|peer_sample| peer_sample.objects.get(uri)) - .is_some_and(|peer_hash| peer_hash == hash) - { - continue; - } - if let Some(peer_sample) = peer_sample_at_seq(peer, sample.raw.seq) - && peer_sample.objects.contains_key(uri) - && find_future_hash_match(peer, sample, uri, hash, args).is_some() - { - let matched = - find_future_hash_match(peer, sample, uri, hash, args).expect("match"); - result.add( - "CONTENT_ROLLOVER_RESOLVED", - SampleRecord { - classification: "CONTENT_ROLLOVER_RESOLVED", - event_type: "object_content_rollover", - key: object_hash_key(uri, hash), - source_side, - source_seq: sample.raw.seq, - source_run_id: sample.raw.run_id.clone(), - matched_seq: Some(matched.seq), - matched_run_id: Some(matched.run_id), - note: "same URI hash appeared in peer sequence later".to_string(), - }, - args.sample_limit, - ); - } - } - } - } -} - -fn collect_persistent_events( - left: &[SequenceSample], - right: &[SequenceSample], - args: &Args, -) -> Vec { - let mut events = Vec::new(); - collect_persistent_set( - &mut events, - "object_uri", - "PERSISTENT_OBJECT_SET_DIVERGENCE", - left, - right, - |sample| &sample.object_uris, - args, - ); - collect_persistent_set( - &mut events, - "object_hash", - "PERSISTENT_CONTENT_DIVERGENCE", - left, - right, - |sample| &sample.object_hashes, - args, - ); - collect_persistent_set( - &mut events, - "reject_uri", - "PERSISTENT_REJECT_DIVERGENCE", - left, - right, - |sample| &sample.rejects, - args, - ); - collect_persistent_set( - &mut events, - "trust_anchor", - "PERSISTENT_TA_DIFFERENCE", - left, - right, - |sample| &sample.trust_anchors, - args, - ); - collect_persistent_set( - &mut events, - "vrp_output", - "PERSISTENT_OUTPUT_DIVERGENCE", - left, - right, - |sample| &sample.vrps, - args, - ); - collect_persistent_set( - &mut events, - "vap_output", - "PERSISTENT_OUTPUT_DIVERGENCE", - left, - right, - |sample| &sample.vaps, - args, - ); - events -} - -fn collect_persistent_set( - events: &mut Vec, - event_type: &'static str, - raw_class: &'static str, - left: &[SequenceSample], - right: &[SequenceSample], - extract: F, - args: &Args, -) where - F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, -{ - collect_persistent_direction( - events, - event_type, - raw_class, - Side::Left, - left, - right, - &extract, - args, - ); - collect_persistent_direction( - events, - event_type, - raw_class, - Side::Right, - right, - left, - &extract, - args, - ); -} - -fn collect_persistent_direction( - events: &mut Vec, - event_type: &'static str, - raw_class: &'static str, - source_side: Side, - source: &[SequenceSample], - peer: &[SequenceSample], - extract: &F, - args: &Args, -) where - F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, -{ - for sample in source { - for key in extract(sample) { - if peer_sample_at_seq(peer, sample.raw.seq) - .is_some_and(|peer_sample| extract(peer_sample).contains(key)) - { - continue; - } - if find_future_match(peer, sample, key, extract, args).is_some() { - continue; - } - events.push(DiffEvent { - event_type, - raw_class, - key: key.clone(), - source_side, - source_seq: sample.raw.seq, - source_run_id: sample.raw.run_id.clone(), - }); - } - } -} - -fn build_adjusted_analysis( - args: &Args, - left: &[SequenceSample], - right: &[SequenceSample], - events: &[DiffEvent], -) -> AdjustedAnalysis { - let mut analysis = AdjustedAnalysis { - raw_persistent_occurrences: events.len(), - raw_persistent_unique_keys: unique_event_count(events), - ..Default::default() - }; - let mut edge_filtered_unique = BTreeSet::new(); - - for event in events { - let source_samples = samples_for_side(event.source_side, left, right); - let peer_samples = samples_for_side(opposite_side(event.source_side), left, right); - let edge = edge_position(source_samples, event.source_seq, args); - if edge == EdgePosition::Stable { - analysis.edge_filtered_occurrences += 1; - edge_filtered_unique.insert(event_identity(event)); - } - let (classification, note) = - adjusted_classification(event, edge, source_samples, peer_samples); - analysis.add( - classification, - AdjustedRecord { - classification, - event_type: event.event_type, - key: event.key.clone(), - source_side: event.source_side, - source_seq: event.source_seq, - source_run_id: event.source_run_id.clone(), - note, - }, - args.sample_limit, - ); - } - - analysis.edge_filtered_unique_keys = edge_filtered_unique.len(); - let mut stable_unique = BTreeSet::new(); - for (class, stats) in &analysis.stats { - if class.starts_with("STABLE_") { - analysis.adjusted_stable_occurrences += stats.total; - stable_unique.extend(stats.unique_keys.iter().cloned()); - } - } - analysis.adjusted_stable_unique_keys = stable_unique.len(); - let timeline_limit = if args.timeline_sample_limit == 0 { - args.sample_limit - } else { - args.timeline_sample_limit - }; - analysis.uri_timeline_samples = - build_uri_timeline_samples(left, right, &analysis, timeline_limit); - analysis.stable_object_groups = - build_stable_object_groups(&analysis, left, right, timeline_limit); - analysis -} - -fn adjusted_classification( - event: &DiffEvent, - edge: EdgePosition, - source: &[SequenceSample], - peer: &[SequenceSample], -) -> (&'static str, String) { - if event.event_type == "trust_anchor" { - if peer_has_same_ta_identity(peer, &event.key) { - return ( - "TA_PROJECTION_FORMAT_DIFFERENCE", - "same TAL hash and TA certificate hash exist on peer side; full TA projection string differs".to_string(), - ); - } - if edge == EdgePosition::Stable { - return ( - "STABLE_TA_DIVERGENCE", - "trust-anchor identity is not aligned in a non-boundary sample".to_string(), - ); - } - return edge_unresolved_class(edge); - } - - if event.event_type == "object_hash" { - if let Some((uri, hash)) = split_object_hash_key(&event.key) { - let peer_has_uri = peer_has_uri_any(peer, uri); - let peer_has_different_hash = peer_has_uri_different_hash_any(peer, uri, hash); - let peer_hash_at_source_seq = peer_sample_at_seq(peer, event.source_seq) - .and_then(|peer_sample| peer_sample.objects.get(uri)); - let source_later_matches_peer_hash = peer_hash_at_source_seq.is_some_and(|peer_hash| { - source_future_has_hash(source, event.source_seq, uri, peer_hash) - }); - if edge == EdgePosition::Leading && peer_has_different_hash { - return ( - "EDGE_LEADING_CONTENT_ROLLOVER", - "source leading-edge hash is absent, while peer already has the same URI with another hash".to_string(), - ); - } - if edge == EdgePosition::Trailing && peer_has_different_hash { - return ( - "EDGE_TRAILING_CONTENT_ROLLOVER", - "source trailing-edge hash is absent, while peer has the same URI with another hash and no later source observation exists".to_string(), - ); - } - if edge == EdgePosition::Stable && source_later_matches_peer_hash { - return ( - "MID_SEQUENCE_CONTENT_ROLLOVER_RESOLVED", - "peer already has a newer same-URI hash at this seq and source catches up later in the observed sequence".to_string(), - ); - } - if edge == EdgePosition::Stable && peer_has_different_hash { - return ( - "STABLE_CONTENT_DIVERGENCE", - "same URI exists on peer side with a different hash in a non-boundary sample" - .to_string(), - ); - } - if edge == EdgePosition::Stable && !peer_has_uri { - return ( - "STABLE_OBJECT_SET_DIVERGENCE", - "content key is derived from a non-boundary URI that is absent on peer side" - .to_string(), - ); - } - } - if edge == EdgePosition::Stable { - return ( - "STABLE_CONTENT_DIVERGENCE", - "object hash key remains unaligned in a non-boundary sample".to_string(), - ); - } - return edge_unresolved_class(edge); - } - - if edge != EdgePosition::Stable { - return edge_unresolved_class(edge); - } - - let stable_class = match event.raw_class { - "PERSISTENT_OBJECT_SET_DIVERGENCE" => "STABLE_OBJECT_SET_DIVERGENCE", - "PERSISTENT_REJECT_DIVERGENCE" => "STABLE_REJECT_DIVERGENCE", - "PERSISTENT_OUTPUT_DIVERGENCE" => "STABLE_OUTPUT_DIVERGENCE", - "PERSISTENT_CONTENT_DIVERGENCE" => "STABLE_CONTENT_DIVERGENCE", - "PERSISTENT_TA_DIFFERENCE" => "STABLE_TA_DIVERGENCE", - _ => "STABLE_UNCLASSIFIED_DIVERGENCE", - }; - ( - stable_class, - "persistent event appears in a non-boundary sample after sequence edge filtering" - .to_string(), - ) -} - -fn edge_unresolved_class(edge: EdgePosition) -> (&'static str, String) { - match edge { - EdgePosition::Leading => ( - "EDGE_LEADING_UNRESOLVED", - "event appears only from the warmup edge and may predate the observed sequence" - .to_string(), - ), - EdgePosition::Trailing => ( - "EDGE_TRAILING_UNRESOLVED", - "event appears at the cooldown edge and lacks later peer observations".to_string(), - ), - EdgePosition::Stable => ( - "STABLE_UNCLASSIFIED_DIVERGENCE", - "event is not on an edge but no specific stable category matched".to_string(), - ), - } -} - -impl AdjustedAnalysis { - fn add(&mut self, class: &'static str, record: AdjustedRecord, sample_limit: usize) { - let stats = self.stats.entry(class).or_default(); - stats.total += 1; - stats.unique_keys.insert(format!( - "{}|{}|{}", - record.event_type, - record.source_side.as_str(), - record.key - )); - if stats.samples.len() < sample_limit { - stats.samples.push(record); - } - } -} - -fn build_sandwich_analysis( - args: &Args, - left: &[SequenceSample], - right: &[SequenceSample], -) -> SandwichAnalysis { - let mut analysis = SandwichAnalysis::default(); - analyze_sandwich_objects(&mut analysis, Side::Left, left, right, args); - analyze_sandwich_objects(&mut analysis, Side::Right, right, left, args); - analyze_sandwich_sets( - &mut analysis, - "reject_uri", - "PEER_MISSING_STABLE_REJECT", - Side::Left, - left, - right, - |sample| &sample.rejects, - args, - ); - analyze_sandwich_sets( - &mut analysis, - "reject_uri", - "PEER_MISSING_STABLE_REJECT", - Side::Right, - right, - left, - |sample| &sample.rejects, - args, - ); - analyze_sandwich_sets( - &mut analysis, - "vrp_output", - "PEER_MISSING_STABLE_OUTPUT", - Side::Left, - left, - right, - |sample| &sample.vrps, - args, - ); - analyze_sandwich_sets( - &mut analysis, - "vrp_output", - "PEER_MISSING_STABLE_OUTPUT", - Side::Right, - right, - left, - |sample| &sample.vrps, - args, - ); - analyze_sandwich_sets( - &mut analysis, - "vap_output", - "PEER_MISSING_STABLE_OUTPUT", - Side::Left, - left, - right, - |sample| &sample.vaps, - args, - ); - analyze_sandwich_sets( - &mut analysis, - "vap_output", - "PEER_MISSING_STABLE_OUTPUT", - Side::Right, - right, - left, - |sample| &sample.vaps, - args, - ); - analysis -} - -fn analyze_sandwich_objects( - analysis: &mut SandwichAnalysis, - source_side: Side, - source: &[SequenceSample], - peer: &[SequenceSample], - args: &Args, -) { - for pair in source.windows(2) { - let source_start = &pair[0]; - let source_end = &pair[1]; - if source_start.validation_time >= source_end.validation_time { - continue; - } - let peers = peer_samples_between(peer, source_start, source_end); - if peers.is_empty() { - continue; - } - for (uri, source_hash) in &source_start.objects { - if source_end.objects.get(uri) != Some(source_hash) { - continue; - } - for peer_sample in &peers { - match peer_sample.objects.get(uri) { - Some(peer_hash) if peer_hash == source_hash => {} - Some(peer_hash) => analysis.add( - "PEER_HASH_MISMATCH_STABLE_OBJECT", - sandwich_record( - "PEER_HASH_MISMATCH_STABLE_OBJECT", - "object", - uri.clone(), - source_side, - source_start, - source_end, - peer_sample, - Some(source_hash.clone()), - Some(peer_hash.clone()), - "source interval has stable object hash; peer sample has same URI with another hash", - ), - args.sample_limit, - ), - None => analysis.add( - "PEER_MISSING_STABLE_OBJECT", - sandwich_record( - "PEER_MISSING_STABLE_OBJECT", - "object", - uri.clone(), - source_side, - source_start, - source_end, - peer_sample, - Some(source_hash.clone()), - None, - "source interval has stable object hash; peer sample misses the URI", - ), - args.sample_limit, - ), - } - } - } - } -} - -fn analyze_sandwich_sets( - analysis: &mut SandwichAnalysis, - set_type: &'static str, - classification: &'static str, - source_side: Side, - source: &[SequenceSample], - peer: &[SequenceSample], - extract: F, - args: &Args, -) where - F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, -{ - for pair in source.windows(2) { - let source_start = &pair[0]; - let source_end = &pair[1]; - if source_start.validation_time >= source_end.validation_time { - continue; - } - let peers = peer_samples_between(peer, source_start, source_end); - if peers.is_empty() { - continue; - } - let start_set = extract(source_start); - let end_set = extract(source_end); - for key in start_set { - if !end_set.contains(key) { - continue; - } - for peer_sample in &peers { - if extract(peer_sample).contains(key) { - continue; - } - analysis.add( - classification, - sandwich_record( - classification, - set_type, - key.clone(), - source_side, - source_start, - source_end, - peer_sample, - Some(key.clone()), - None, - "source interval has a stable key; peer sample misses the key", - ), - args.sample_limit, - ); - } - } - } -} - -fn peer_samples_between<'a>( - peer: &'a [SequenceSample], - source_start: &SequenceSample, - source_end: &SequenceSample, -) -> Vec<&'a SequenceSample> { - peer.iter() - .filter(|sample| { - source_start.validation_time < sample.validation_time - && sample.validation_time < source_end.validation_time - }) - .collect() -} - -#[allow(clippy::too_many_arguments)] -fn sandwich_record( - classification: &'static str, - set_type: &'static str, - key: String, - source_side: Side, - source_start: &SequenceSample, - source_end: &SequenceSample, - peer_sample: &SequenceSample, - source_value: Option, - peer_value: Option, - note: &str, -) -> SandwichRecord { - SandwichRecord { - classification, - set_type, - key, - source_side, - source_start_seq: source_start.raw.seq, - source_start_run_id: source_start.raw.run_id.clone(), - source_end_seq: source_end.raw.seq, - source_end_run_id: source_end.raw.run_id.clone(), - peer_seq: peer_sample.raw.seq, - peer_run_id: peer_sample.raw.run_id.clone(), - source_value, - peer_value, - source_start_time: format_time(source_start.validation_time), - peer_time: format_time(peer_sample.validation_time), - source_end_time: format_time(source_end.validation_time), - note: note.to_string(), - } -} - -impl SandwichAnalysis { - fn add(&mut self, class: &'static str, record: SandwichRecord, sample_limit: usize) { - self.total_occurrences += 1; - self.unique_keys.insert(sandwich_unique_key(&record)); - *self.by_set_type.entry(record.set_type).or_default() += 1; - let stats = self.stats.entry(class).or_default(); - stats.total += 1; - stats.unique_keys.insert(sandwich_unique_key(&record)); - if stats.samples.len() < sample_limit { - stats.samples.push(record); - } - } -} - -fn sandwich_unique_key(record: &SandwichRecord) -> String { - format!( - "{}|{}|{}|{}", - record.classification, - record.set_type, - record.source_side.as_str(), - record.key - ) -} - -fn unique_event_count(events: &[DiffEvent]) -> usize { - events - .iter() - .map(event_identity) - .collect::>() - .len() -} - -fn event_identity(event: &DiffEvent) -> String { - format!( - "{}|{}|{}", - event.event_type, - event.source_side.as_str(), - event.key - ) -} - -fn samples_for_side<'a>( - side: Side, - left: &'a [SequenceSample], - right: &'a [SequenceSample], -) -> &'a [SequenceSample] { - match side { - Side::Left => left, - Side::Right => right, - } -} - -fn opposite_side(side: Side) -> Side { - match side { - Side::Left => Side::Right, - Side::Right => Side::Left, - } -} - -fn edge_position(samples: &[SequenceSample], seq: u32, args: &Args) -> EdgePosition { - let Some(index) = samples.iter().position(|sample| sample.raw.seq == seq) else { - return EdgePosition::Stable; - }; - if index < args.warmup_samples { - return EdgePosition::Leading; - } - if samples.len().saturating_sub(index) <= args.cooldown_samples { - return EdgePosition::Trailing; - } - EdgePosition::Stable -} - -fn peer_has_uri_any(peer: &[SequenceSample], uri: &str) -> bool { - peer.iter().any(|sample| sample.objects.contains_key(uri)) -} - -fn peer_has_uri_different_hash_any(peer: &[SequenceSample], uri: &str, hash: &str) -> bool { - peer.iter() - .filter_map(|sample| sample.objects.get(uri)) - .any(|peer_hash| peer_hash != hash) -} - -fn source_future_has_hash( - source: &[SequenceSample], - seq: u32, - uri: &str, - expected_hash: &str, -) -> bool { - source - .iter() - .filter(|sample| sample.raw.seq > seq) - .any(|sample| { - sample - .objects - .get(uri) - .is_some_and(|hash| hash == expected_hash) - }) -} - -fn peer_has_same_ta_identity(peer: &[SequenceSample], key: &str) -> bool { - let Some(identity) = trust_anchor_identity(key) else { - return false; - }; - peer.iter().any(|sample| { - sample - .trust_anchors - .iter() - .filter_map(|peer_key| trust_anchor_identity(peer_key)) - .any(|peer_identity| peer_identity == identity) - }) -} - -fn trust_anchor_identity(key: &str) -> Option { - let parts = key.split('|').collect::>(); - if parts.len() != 4 { - return None; - } - Some(format!("{}|{}", parts[2], parts[3])) -} - -fn split_object_hash_key(key: &str) -> Option<(&str, &str)> { - key.rsplit_once('|') -} - -fn build_uri_timeline_samples( - left: &[SequenceSample], - right: &[SequenceSample], - adjusted: &AdjustedAnalysis, - limit: usize, -) -> Vec { - let mut uris = BTreeSet::new(); - for stats in adjusted.stats.values() { - for sample in &stats.samples { - if sample.event_type == "object_hash" - && let Some((uri, _)) = split_object_hash_key(&sample.key) - { - uris.insert(uri.to_string()); - } - if sample.event_type == "object_uri" { - uris.insert(sample.key.clone()); - } - } - } - uris.into_iter() - .take(limit) - .map(|uri| { - json!({ - "uri": uri, - "left": timeline_for_uri(left, &uri), - "right": timeline_for_uri(right, &uri), - }) - }) - .collect() -} - -fn timeline_for_uri(samples: &[SequenceSample], uri: &str) -> Vec { - samples - .iter() - .filter_map(|sample| { - sample.objects.get(uri).map(|hash| { - json!({ - "seq": sample.raw.seq, - "runId": sample.raw.run_id, - "validationTime": format_time(sample.validation_time), - "hash": hash, - }) - }) - }) - .collect() -} - -fn build_stable_object_groups( - adjusted: &AdjustedAnalysis, - left: &[SequenceSample], - right: &[SequenceSample], - limit: usize, -) -> Vec { - let mut groups: BTreeMap = BTreeMap::new(); - let Some(stats) = adjusted.stats.get("STABLE_OBJECT_SET_DIVERGENCE") else { - return Vec::new(); - }; - for sample in &stats.samples { - let physical_uri = physical_object_uri(sample); - let group_key = format!( - "{}|{}|{}|{}", - sample.source_side.as_str(), - sample.source_seq, - sample.source_run_id, - publication_point_prefix(&physical_uri) - ); - let group = groups - .entry(group_key) - .or_insert_with(|| StableObjectGroup::new(sample, &physical_uri)); - if group.source_cir_path.is_none() - && let Some(source_sample) = sample_by_side_seq_run( - left, - right, - sample.source_side, - sample.source_seq, - &sample.source_run_id, - ) - { - group.source_cir_path = Some(path_string(&source_sample.cir_path)); - } - group.add(sample, physical_uri); - } - groups - .into_values() - .take(limit) - .map(StableObjectGroup::to_json) - .collect() -} - -#[derive(Clone, Debug)] -struct StableObjectGroup { - source_side: Side, - source_seq: u32, - source_run_id: String, - source_cir_path: Option, - publication_point: String, - event_count: usize, - event_types: BTreeMap<&'static str, usize>, - physical_objects: BTreeMap, -} - -impl StableObjectGroup { - fn new(sample: &AdjustedRecord, physical_uri: &str) -> Self { - Self { - source_side: sample.source_side, - source_seq: sample.source_seq, - source_run_id: sample.source_run_id.clone(), - source_cir_path: None, - publication_point: publication_point_prefix(physical_uri), - event_count: 0, - event_types: BTreeMap::new(), - physical_objects: BTreeMap::new(), - } - } - - fn add(&mut self, sample: &AdjustedRecord, physical_uri: String) { - self.event_count += 1; - *self.event_types.entry(sample.event_type).or_default() += 1; - let object = self - .physical_objects - .entry(physical_uri.clone()) - .or_insert_with(|| StablePhysicalObject { - extension: object_extension(&physical_uri).to_string(), - uri: physical_uri, - event_types: BTreeSet::new(), - hashes: BTreeSet::new(), - }); - object.event_types.insert(sample.event_type); - if let Some(hash) = event_hash(sample) { - object.hashes.insert(hash.to_string()); - } - } - - fn to_json(self) -> Value { - json!({ - "sourceSide": self.source_side.as_str(), - "sourceSeq": self.source_seq, - "sourceRunId": self.source_run_id, - "sourceCirPath": self.source_cir_path, - "publicationPoint": self.publication_point, - "eventCount": self.event_count, - "eventTypes": self.event_types, - "physicalObjectCount": self.physical_objects.len(), - "physicalObjects": self.physical_objects.into_values().map(StablePhysicalObject::to_json).collect::>(), - }) - } -} - -#[derive(Clone, Debug)] -struct StablePhysicalObject { - uri: String, - extension: String, - event_types: BTreeSet<&'static str>, - hashes: BTreeSet, -} - -impl StablePhysicalObject { - fn to_json(self) -> Value { - json!({ - "uri": self.uri, - "extension": self.extension, - "eventTypes": self.event_types, - "hashes": self.hashes, - }) - } -} - -fn physical_object_uri(sample: &AdjustedRecord) -> String { - if sample.event_type == "object_hash" - && let Some((uri, _)) = split_object_hash_key(&sample.key) - { - return uri.to_string(); - } - sample.key.clone() -} - -fn event_hash(sample: &AdjustedRecord) -> Option<&str> { - if sample.event_type == "object_hash" { - split_object_hash_key(&sample.key).map(|(_, hash)| hash) - } else { - None - } -} - -fn publication_point_prefix(uri: &str) -> String { - uri.rsplit_once('/') - .map(|(prefix, _)| format!("{prefix}/")) - .unwrap_or_else(|| uri.to_string()) -} - -fn object_extension(uri: &str) -> &str { - uri.rsplit_once('.') - .map(|(_, extension)| extension) - .unwrap_or("") -} - -fn sample_by_side_seq_run<'a>( - left: &'a [SequenceSample], - right: &'a [SequenceSample], - side: Side, - seq: u32, - run_id: &str, -) -> Option<&'a SequenceSample> { - samples_for_side(side, left, right) - .iter() - .find(|sample| sample.raw.seq == seq && sample.raw.run_id == run_id) -} - -impl AnalysisResult { - fn add(&mut self, class: &'static str, record: SampleRecord, sample_limit: usize) { - let stats = self.stats.entry(class).or_default(); - stats.total += 1; - if stats.samples.len() < sample_limit { - stats.samples.push(record); - } - } -} - -fn peer_sample_at_seq(peer: &[SequenceSample], seq: u32) -> Option<&SequenceSample> { - peer.iter().find(|sample| sample.raw.seq == seq) -} - -fn find_future_match( - peer: &[SequenceSample], - source: &SequenceSample, - key: &str, - extract: &F, - args: &Args, -) -> Option -where - F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, -{ - peer.iter() - .filter(|candidate| is_in_alignment_window(source, candidate, args)) - .find(|candidate| extract(candidate).contains(key)) - .map(|candidate| { - occurrence( - candidate, - if candidate.raw.side.as_deref() == Some("left") { - Side::Left - } else { - Side::Right - }, - ) - }) -} - -fn find_future_hash_match( - peer: &[SequenceSample], - source: &SequenceSample, - uri: &str, - hash: &str, - args: &Args, -) -> Option { - peer.iter() - .filter(|candidate| is_in_alignment_window(source, candidate, args)) - .find(|candidate| { - candidate - .objects - .get(uri) - .is_some_and(|peer_hash| peer_hash == hash) - }) - .map(|candidate| { - occurrence( - candidate, - if candidate.raw.side.as_deref() == Some("left") { - Side::Left - } else { - Side::Right - }, - ) - }) -} - -fn is_in_alignment_window( - source: &SequenceSample, - candidate: &SequenceSample, - args: &Args, -) -> bool { - if candidate.raw.seq < source.raw.seq { - return false; - } - let run_delta = candidate.raw.seq.saturating_sub(source.raw.seq); - let time_delta = candidate.validation_time - source.validation_time; - let secs = time_delta.whole_seconds().abs(); - run_delta <= args.align_window_runs || secs <= args.align_window_secs -} - -fn occurrence(sample: &SequenceSample, side: Side) -> EventOccurrence { - EventOccurrence { - side, - seq: sample.raw.seq, - run_id: sample.raw.run_id.clone(), - } -} - -fn build_output( - args: &Args, - left: &[SequenceSample], - right: &[SequenceSample], - result: &AnalysisResult, - adjusted: &AdjustedAnalysis, - sandwich: &SandwichAnalysis, -) -> Value { - let classifications = result - .stats - .iter() - .map(|(class, stats)| { - json!({ - "classification": class, - "count": stats.total, - "samples": stats.samples.iter().map(sample_to_json).collect::>(), - }) - }) - .collect::>(); - let adjusted_classifications = adjusted - .stats - .iter() - .map(|(class, stats)| { - json!({ - "classification": class, - "occurrences": stats.total, - "uniqueKeys": stats.unique_keys.len(), - "samples": stats.samples.iter().map(adjusted_sample_to_json).collect::>(), - }) - }) - .collect::>(); - let sandwich_classifications = sandwich - .stats - .iter() - .map(|(class, stats)| { - json!({ - "classification": class, - "occurrences": stats.total, - "uniqueKeys": stats.unique_keys.len(), - "samples": stats.samples.iter().map(sandwich_sample_to_json).collect::>(), - }) - }) - .collect::>(); - json!({ - "schemaVersion": 1, - "generatedBy": "sequence_triage_ccr_cir", - "inputContract": "left-right-sequence-jsonl-with-ccr-cir-artifacts", - "parameters": { - "leftSequence": path_string(&args.left_sequence), - "rightSequence": path_string(&args.right_sequence), - "alignWindowRuns": args.align_window_runs, - "alignWindowSecs": args.align_window_secs, - "sampleLimit": args.sample_limit, - "warmupSamples": args.warmup_samples, - "cooldownSamples": args.cooldown_samples, - "timelineSampleLimit": if args.timeline_sample_limit == 0 { args.sample_limit } else { args.timeline_sample_limit }, - }, - "left": sequence_summary(left), - "right": sequence_summary(right), - "classificationCounts": classifications, - "totals": { - "resolvedTemporalLike": count_class(result, "TEMPORAL_LAG_RESOLVED") + count_class(result, "CONTENT_ROLLOVER_RESOLVED"), - "persistent": count_prefix(result, "PERSISTENT_"), - "unclassified": count_class(result, "UNCLASSIFIED_INSUFFICIENT_WINDOW"), - }, - "adjusted": { - "warmupSamples": args.warmup_samples, - "cooldownSamples": args.cooldown_samples, - "rawPersistent": { - "occurrences": adjusted.raw_persistent_occurrences, - "uniqueKeys": adjusted.raw_persistent_unique_keys, - }, - "edgeFilteredPersistent": { - "occurrences": adjusted.edge_filtered_occurrences, - "uniqueKeys": adjusted.edge_filtered_unique_keys, - }, - "adjustedStablePersistent": { - "occurrences": adjusted.adjusted_stable_occurrences, - "uniqueKeys": adjusted.adjusted_stable_unique_keys, - }, - "classificationCounts": adjusted_classifications, - "uriTimelineSamples": adjusted.uri_timeline_samples, - "stableObjectGroups": adjusted.stable_object_groups, - "interpretation": { - "rawPersistentMeaning": "raw persistent keeps the original #043 single-event matching semantics.", - "edgeFilteredMeaning": "edge-filtered persistent keeps only non-warmup and non-cooldown source occurrences.", - "adjustedStableMeaning": "adjusted stable persistent keeps non-edge findings after URI-level rollover and TA projection filtering.", - "stableObjectGroupsMeaning": "STABLE_OBJECT_SET_DIVERGENCE events are additionally collapsed by source CIR, publication point, and physical object URI so object_uri/object_hash duplicate event views do not inflate physical-object counts.", - } - }, - "sandwich": { - "strictTimeWindow": true, - "method": "For each side, use two adjacent source samples as a stable interval. If source_start.time < peer.time < source_end.time and the source value is identical at both interval endpoints, the peer sample is expected to contain the same value.", - "totals": { - "occurrences": sandwich.total_occurrences, - "uniqueKeys": sandwich.unique_keys.len(), - }, - "bySetType": sandwich.by_set_type, - "classificationCounts": sandwich_classifications, - "interpretation": { - "missingStableObject": "The source side proves a URI/hash is stable across an interval that contains the peer sample, but the peer sample has no such URI.", - "hashMismatchStableObject": "The source side proves a URI/hash is stable across an interval that contains the peer sample, but the peer sample has the same URI with a different hash.", - "missingStableReject": "The source side consistently rejects the URI across an interval that contains the peer sample, but the peer sample does not reject it.", - "missingStableOutput": "The source side consistently outputs a VRP/VAP key across an interval that contains the peer sample, but the peer sample does not output it." - } - }, - "interpretation": { - "temporalResolvedMeaning": "Event appeared only on one side at one sample but aligned in the peer sequence later.", - "persistentMeaning": "Event did not align within the configured run/time window; inspect as a candidate RP behavior difference or persistent sync/input difference.", - "limits": [ - "Sequence triage only reads sequence JSONL plus referenced CCR/CIR files.", - "It does not read report.json, logs, repo-bytes DB, cache, mirror, or raw objects for root cause proof.", - "Resolved temporal findings reduce false positives but do not prove the exact external repository update time." - ] - } - }) -} - -fn sequence_summary(samples: &[SequenceSample]) -> Value { - json!({ - "sampleCount": samples.len(), - "rpIds": samples.iter().map(|sample| sample.raw.rp_id.clone()).collect::>(), - "firstSeq": samples.first().map(|sample| sample.raw.seq), - "lastSeq": samples.last().map(|sample| sample.raw.seq), - "firstValidationTime": samples.first().map(|sample| format_time(sample.validation_time)), - "lastValidationTime": samples.last().map(|sample| format_time(sample.validation_time)), - "samples": samples.iter().map(|sample| json!({ - "seq": sample.raw.seq, - "runId": sample.raw.run_id, - "syncMode": sample.raw.sync_mode, - "startTime": sample.raw.start_time, - "finishTime": sample.raw.finish_time, - "validationTime": format_time(sample.validation_time), - "status": sample.raw.status, - "ccrPath": path_string(&sample.ccr_path), - "cirPath": path_string(&sample.cir_path), - "ccrSha256": sample.raw.ccr_sha256, - "cirSha256": sample.raw.cir_sha256, - "wallMs": sample.raw.wall_ms, - "maxRssKb": sample.raw.max_rss_kb, - "vrps": sample.raw.vrps.or(Some(sample.vrps.len() as u64)), - "vaps": sample.raw.vaps.or(Some(sample.vaps.len() as u64)), - "objectCount": sample.object_uris.len(), - "rejectCount": sample.rejects.len(), - "trustAnchorCount": sample.trust_anchors.len(), - })).collect::>(), - }) -} - -fn sample_to_json(sample: &SampleRecord) -> Value { - json!({ - "classification": sample.classification, - "eventType": sample.event_type, - "key": sample.key, - "sourceSide": sample.source_side.as_str(), - "sourceSeq": sample.source_seq, - "sourceRunId": sample.source_run_id, - "matchedSeq": sample.matched_seq, - "matchedRunId": sample.matched_run_id, - "note": sample.note, - }) -} - -fn adjusted_sample_to_json(sample: &AdjustedRecord) -> Value { - json!({ - "classification": sample.classification, - "eventType": sample.event_type, - "key": sample.key, - "sourceSide": sample.source_side.as_str(), - "sourceSeq": sample.source_seq, - "sourceRunId": sample.source_run_id, - "note": sample.note, - }) -} - -fn sandwich_sample_to_json(sample: &SandwichRecord) -> Value { - json!({ - "classification": sample.classification, - "setType": sample.set_type, - "key": sample.key, - "sourceSide": sample.source_side.as_str(), - "sourceStartSeq": sample.source_start_seq, - "sourceStartRunId": sample.source_start_run_id, - "sourceEndSeq": sample.source_end_seq, - "sourceEndRunId": sample.source_end_run_id, - "peerSeq": sample.peer_seq, - "peerRunId": sample.peer_run_id, - "sourceValue": sample.source_value, - "peerValue": sample.peer_value, - "sourceStartTime": sample.source_start_time, - "peerTime": sample.peer_time, - "sourceEndTime": sample.source_end_time, - "note": sample.note, - }) -} - -fn count_class(result: &AnalysisResult, class: &'static str) -> usize { - result - .stats - .get(class) - .map(|stats| stats.total) - .unwrap_or(0) -} - -fn count_prefix(result: &AnalysisResult, prefix: &str) -> usize { - result - .stats - .iter() - .filter(|(class, _)| class.starts_with(prefix)) - .map(|(_, stats)| stats.total) - .sum() -} - -fn write_json(path: &Path, value: &Value) -> Result<(), String> { - std::fs::write( - path, - serde_json::to_string_pretty(value).map_err(|e| e.to_string())? + "\n", - ) - .map_err(|e| format!("write JSON failed: {}: {e}", path.display())) -} - -fn write_markdown(path: &Path, output: &Value) -> Result<(), String> { - let mut lines = vec![ - "# CCR/CIR Sequence Triage Summary".to_string(), - "".to_string(), - format!( - "- `generatedBy`: `{}`", - output["generatedBy"].as_str().unwrap_or("") - ), - format!( - "- `leftSamples`: `{}`", - output["left"]["sampleCount"].as_u64().unwrap_or(0) - ), - format!( - "- `rightSamples`: `{}`", - output["right"]["sampleCount"].as_u64().unwrap_or(0) - ), - format!( - "- `alignWindowRuns`: `{}`", - output["parameters"]["alignWindowRuns"] - .as_u64() - .unwrap_or(0) - ), - format!( - "- `alignWindowSecs`: `{}`", - output["parameters"]["alignWindowSecs"] - .as_i64() - .unwrap_or(0) - ), - format!( - "- `warmupSamples`: `{}`", - output["adjusted"]["warmupSamples"].as_u64().unwrap_or(0) - ), - format!( - "- `cooldownSamples`: `{}`", - output["adjusted"]["cooldownSamples"].as_u64().unwrap_or(0) - ), - "".to_string(), - "## Classification Counts".to_string(), - "".to_string(), - "| Classification | Count |".to_string(), - "|---|---:|".to_string(), - ]; - if let Some(classes) = output["classificationCounts"].as_array() { - for item in classes { - lines.push(format!( - "| `{}` | {} |", - item["classification"].as_str().unwrap_or(""), - item["count"].as_u64().unwrap_or(0) - )); - } - } - lines.extend([ - "".to_string(), - "## Adjusted Boundary / Rollover Classification".to_string(), - "".to_string(), - format!( - "- `rawPersistent`: `{}` occurrences / `{}` unique keys", - output["adjusted"]["rawPersistent"]["occurrences"] - .as_u64() - .unwrap_or(0), - output["adjusted"]["rawPersistent"]["uniqueKeys"] - .as_u64() - .unwrap_or(0) - ), - format!( - "- `edgeFilteredPersistent`: `{}` occurrences / `{}` unique keys", - output["adjusted"]["edgeFilteredPersistent"]["occurrences"] - .as_u64() - .unwrap_or(0), - output["adjusted"]["edgeFilteredPersistent"]["uniqueKeys"] - .as_u64() - .unwrap_or(0) - ), - format!( - "- `adjustedStablePersistent`: `{}` occurrences / `{}` unique keys", - output["adjusted"]["adjustedStablePersistent"]["occurrences"] - .as_u64() - .unwrap_or(0), - output["adjusted"]["adjustedStablePersistent"]["uniqueKeys"] - .as_u64() - .unwrap_or(0) - ), - "".to_string(), - "| Adjusted Classification | Occurrences | Unique Keys |".to_string(), - "|---|---:|---:|".to_string(), - ]); - if let Some(classes) = output["adjusted"]["classificationCounts"].as_array() { - for item in classes { - lines.push(format!( - "| `{}` | {} | {} |", - item["classification"].as_str().unwrap_or(""), - item["occurrences"].as_u64().unwrap_or(0), - item["uniqueKeys"].as_u64().unwrap_or(0) - )); - } - } - if let Some(groups) = output["adjusted"]["stableObjectGroups"].as_array() - && !groups.is_empty() - { - lines.extend([ - "".to_string(), - "## Stable Object Groups".to_string(), - "".to_string(), - "| Source | CIR | Publication Point | Events | Physical Objects |".to_string(), - "|---|---|---|---:|---:|".to_string(), - ]); - for group in groups { - lines.push(format!( - "| `{}/seq{}/{}` | `{}` | `{}` | {} | {} |", - group["sourceSide"].as_str().unwrap_or(""), - group["sourceSeq"].as_u64().unwrap_or(0), - group["sourceRunId"].as_str().unwrap_or(""), - group["sourceCirPath"].as_str().unwrap_or(""), - group["publicationPoint"].as_str().unwrap_or(""), - group["eventCount"].as_u64().unwrap_or(0), - group["physicalObjectCount"].as_u64().unwrap_or(0), - )); - } - } - lines.extend([ - "".to_string(), - "## Sandwich Anomaly Check".to_string(), - "".to_string(), - format!( - "- `occurrences`: `{}`", - output["sandwich"]["totals"]["occurrences"] - .as_u64() - .unwrap_or(0) - ), - format!( - "- `uniqueKeys`: `{}`", - output["sandwich"]["totals"]["uniqueKeys"] - .as_u64() - .unwrap_or(0) - ), - "- Rule: source side has two adjacent stable samples, and peer sample timestamp is strictly between them.".to_string(), - "".to_string(), - "| Sandwich Classification | Occurrences | Unique Keys |".to_string(), - "|---|---:|---:|".to_string(), - ]); - if let Some(classes) = output["sandwich"]["classificationCounts"].as_array() { - for item in classes { - lines.push(format!( - "| `{}` | {} | {} |", - item["classification"].as_str().unwrap_or(""), - item["occurrences"].as_u64().unwrap_or(0), - item["uniqueKeys"].as_u64().unwrap_or(0) - )); - } - } - lines.extend([ - "".to_string(), - "## Interpretation".to_string(), - "".to_string(), - "- `TEMPORAL_LAG_RESOLVED` / `CONTENT_ROLLOVER_RESOLVED` 表示差异在后续采样中对齐,优先视为采样时刻或仓库滚动窗口差异。".to_string(), - "- `PERSISTENT_*` 表示配置窗口内仍未对齐,才是后续人工排查的高价值候选。".to_string(), - "- `EDGE_*` 表示差异落在序列首尾,缺少前置或后续观察,不能直接当作实现差异。".to_string(), - "- `STABLE_*` 表示经过首尾边界过滤和 URI-level rollover 过滤后仍存在的稳定候选。".to_string(), - ]); - std::fs::write(path, lines.join("\n") + "\n") - .map_err(|e| format!("write markdown failed: {}: {e}", path.display())) -} - -fn write_samples_jsonl(path: &Path, result: &AnalysisResult) -> Result<(), String> { - let mut body = String::new(); - for stats in result.stats.values() { - for sample in &stats.samples { - body.push_str( - &serde_json::to_string(&sample_to_json(sample)).map_err(|e| e.to_string())?, - ); - body.push('\n'); - } - } - std::fs::write(path, body).map_err(|e| format!("write samples failed: {}: {e}", path.display())) -} - -fn read_file(path: &Path) -> Result, String> { - std::fs::read(path).map_err(|e| format!("read file failed: {}: {e}", path.display())) -} - -fn resolve_path(base_dir: &Path, path: &Path) -> PathBuf { - if path.is_absolute() { - path.to_path_buf() - } else { - base_dir.join(path) - } -} - -fn parse_rfc3339(value: &str) -> Result { - OffsetDateTime::parse(value, &Rfc3339) - .map_err(|e| format!("parse RFC3339 failed: {value}: {e}")) -} - -fn format_time(value: OffsetDateTime) -> String { - value.format(&Rfc3339).unwrap_or_else(|_| value.to_string()) -} - -fn path_string(path: &Path) -> String { - path.to_string_lossy().into_owned() -} - -fn object_hash_key(uri: &str, hash: &str) -> String { - format!("{uri}|{hash}") -} - #[cfg(test)] mod tests { use super::*; @@ -2066,6 +120,10 @@ mod tests { }; use crate::data_model::roa::{IpPrefix, RoaAfi}; use crate::validation::objects::{AspaAttestation, Vrp}; + use io::format_time; + use serde_json::{Value, json}; + use std::path::Path; + use time::OffsetDateTime; #[test] fn parse_args_accepts_required_flags() { diff --git a/src/tools/sequence_triage_ccr_cir/adjusted.rs b/src/tools/sequence_triage_ccr_cir/adjusted.rs new file mode 100644 index 0000000..33e8eae --- /dev/null +++ b/src/tools/sequence_triage_ccr_cir/adjusted.rs @@ -0,0 +1,503 @@ +use std::collections::{BTreeMap, BTreeSet}; + +use serde_json::{Value, json}; + +use super::analysis::peer_sample_at_seq; +use super::args::Args; +use super::io::{format_time, path_string}; +use super::model::{ + AdjustedAnalysis, AdjustedRecord, DiffEvent, EdgePosition, SequenceSample, Side, +}; + +pub(super) fn build_adjusted_analysis( + args: &Args, + left: &[SequenceSample], + right: &[SequenceSample], + events: &[DiffEvent], +) -> AdjustedAnalysis { + let mut analysis = AdjustedAnalysis { + raw_persistent_occurrences: events.len(), + raw_persistent_unique_keys: unique_event_count(events), + ..Default::default() + }; + let mut edge_filtered_unique = BTreeSet::new(); + + for event in events { + let source_samples = samples_for_side(event.source_side, left, right); + let peer_samples = samples_for_side(opposite_side(event.source_side), left, right); + let edge = edge_position(source_samples, event.source_seq, args); + if edge == EdgePosition::Stable { + analysis.edge_filtered_occurrences += 1; + edge_filtered_unique.insert(event_identity(event)); + } + let (classification, note) = + adjusted_classification(event, edge, source_samples, peer_samples); + analysis.add( + classification, + AdjustedRecord { + classification, + event_type: event.event_type, + key: event.key.clone(), + source_side: event.source_side, + source_seq: event.source_seq, + source_run_id: event.source_run_id.clone(), + note, + }, + args.sample_limit, + ); + } + + analysis.edge_filtered_unique_keys = edge_filtered_unique.len(); + let mut stable_unique = BTreeSet::new(); + for (class, stats) in &analysis.stats { + if class.starts_with("STABLE_") { + analysis.adjusted_stable_occurrences += stats.total; + stable_unique.extend(stats.unique_keys.iter().cloned()); + } + } + analysis.adjusted_stable_unique_keys = stable_unique.len(); + let timeline_limit = if args.timeline_sample_limit == 0 { + args.sample_limit + } else { + args.timeline_sample_limit + }; + analysis.uri_timeline_samples = + build_uri_timeline_samples(left, right, &analysis, timeline_limit); + analysis.stable_object_groups = + build_stable_object_groups(&analysis, left, right, timeline_limit); + analysis +} + +fn adjusted_classification( + event: &DiffEvent, + edge: EdgePosition, + source: &[SequenceSample], + peer: &[SequenceSample], +) -> (&'static str, String) { + if event.event_type == "trust_anchor" { + if peer_has_same_ta_identity(peer, &event.key) { + return ( + "TA_PROJECTION_FORMAT_DIFFERENCE", + "same TAL hash and TA certificate hash exist on peer side; full TA projection string differs".to_string(), + ); + } + if edge == EdgePosition::Stable { + return ( + "STABLE_TA_DIVERGENCE", + "trust-anchor identity is not aligned in a non-boundary sample".to_string(), + ); + } + return edge_unresolved_class(edge); + } + + if event.event_type == "object_hash" { + if let Some((uri, hash)) = split_object_hash_key(&event.key) { + let peer_has_uri = peer_has_uri_any(peer, uri); + let peer_has_different_hash = peer_has_uri_different_hash_any(peer, uri, hash); + let peer_hash_at_source_seq = peer_sample_at_seq(peer, event.source_seq) + .and_then(|peer_sample| peer_sample.objects.get(uri)); + let source_later_matches_peer_hash = peer_hash_at_source_seq.is_some_and(|peer_hash| { + source_future_has_hash(source, event.source_seq, uri, peer_hash) + }); + if edge == EdgePosition::Leading && peer_has_different_hash { + return ( + "EDGE_LEADING_CONTENT_ROLLOVER", + "source leading-edge hash is absent, while peer already has the same URI with another hash".to_string(), + ); + } + if edge == EdgePosition::Trailing && peer_has_different_hash { + return ( + "EDGE_TRAILING_CONTENT_ROLLOVER", + "source trailing-edge hash is absent, while peer has the same URI with another hash and no later source observation exists".to_string(), + ); + } + if edge == EdgePosition::Stable && source_later_matches_peer_hash { + return ( + "MID_SEQUENCE_CONTENT_ROLLOVER_RESOLVED", + "peer already has a newer same-URI hash at this seq and source catches up later in the observed sequence".to_string(), + ); + } + if edge == EdgePosition::Stable && peer_has_different_hash { + return ( + "STABLE_CONTENT_DIVERGENCE", + "same URI exists on peer side with a different hash in a non-boundary sample" + .to_string(), + ); + } + if edge == EdgePosition::Stable && !peer_has_uri { + return ( + "STABLE_OBJECT_SET_DIVERGENCE", + "content key is derived from a non-boundary URI that is absent on peer side" + .to_string(), + ); + } + } + if edge == EdgePosition::Stable { + return ( + "STABLE_CONTENT_DIVERGENCE", + "object hash key remains unaligned in a non-boundary sample".to_string(), + ); + } + return edge_unresolved_class(edge); + } + + if edge != EdgePosition::Stable { + return edge_unresolved_class(edge); + } + + let stable_class = match event.raw_class { + "PERSISTENT_OBJECT_SET_DIVERGENCE" => "STABLE_OBJECT_SET_DIVERGENCE", + "PERSISTENT_REJECT_DIVERGENCE" => "STABLE_REJECT_DIVERGENCE", + "PERSISTENT_OUTPUT_DIVERGENCE" => "STABLE_OUTPUT_DIVERGENCE", + "PERSISTENT_CONTENT_DIVERGENCE" => "STABLE_CONTENT_DIVERGENCE", + "PERSISTENT_TA_DIFFERENCE" => "STABLE_TA_DIVERGENCE", + _ => "STABLE_UNCLASSIFIED_DIVERGENCE", + }; + ( + stable_class, + "persistent event appears in a non-boundary sample after sequence edge filtering" + .to_string(), + ) +} + +fn edge_unresolved_class(edge: EdgePosition) -> (&'static str, String) { + match edge { + EdgePosition::Leading => ( + "EDGE_LEADING_UNRESOLVED", + "event appears only from the warmup edge and may predate the observed sequence" + .to_string(), + ), + EdgePosition::Trailing => ( + "EDGE_TRAILING_UNRESOLVED", + "event appears at the cooldown edge and lacks later peer observations".to_string(), + ), + EdgePosition::Stable => ( + "STABLE_UNCLASSIFIED_DIVERGENCE", + "event is not on an edge but no specific stable category matched".to_string(), + ), + } +} + +impl AdjustedAnalysis { + fn add(&mut self, class: &'static str, record: AdjustedRecord, sample_limit: usize) { + let stats = self.stats.entry(class).or_default(); + stats.total += 1; + stats.unique_keys.insert(format!( + "{}|{}|{}", + record.event_type, + record.source_side.as_str(), + record.key + )); + if stats.samples.len() < sample_limit { + stats.samples.push(record); + } + } +} + +fn unique_event_count(events: &[DiffEvent]) -> usize { + events + .iter() + .map(event_identity) + .collect::>() + .len() +} + +fn event_identity(event: &DiffEvent) -> String { + format!( + "{}|{}|{}", + event.event_type, + event.source_side.as_str(), + event.key + ) +} + +fn samples_for_side<'a>( + side: Side, + left: &'a [SequenceSample], + right: &'a [SequenceSample], +) -> &'a [SequenceSample] { + match side { + Side::Left => left, + Side::Right => right, + } +} + +fn opposite_side(side: Side) -> Side { + match side { + Side::Left => Side::Right, + Side::Right => Side::Left, + } +} + +fn edge_position(samples: &[SequenceSample], seq: u32, args: &Args) -> EdgePosition { + let Some(index) = samples.iter().position(|sample| sample.raw.seq == seq) else { + return EdgePosition::Stable; + }; + if index < args.warmup_samples { + return EdgePosition::Leading; + } + if samples.len().saturating_sub(index) <= args.cooldown_samples { + return EdgePosition::Trailing; + } + EdgePosition::Stable +} + +fn peer_has_uri_any(peer: &[SequenceSample], uri: &str) -> bool { + peer.iter().any(|sample| sample.objects.contains_key(uri)) +} + +fn peer_has_uri_different_hash_any(peer: &[SequenceSample], uri: &str, hash: &str) -> bool { + peer.iter() + .filter_map(|sample| sample.objects.get(uri)) + .any(|peer_hash| peer_hash != hash) +} + +fn source_future_has_hash( + source: &[SequenceSample], + seq: u32, + uri: &str, + expected_hash: &str, +) -> bool { + source + .iter() + .filter(|sample| sample.raw.seq > seq) + .any(|sample| { + sample + .objects + .get(uri) + .is_some_and(|hash| hash == expected_hash) + }) +} + +fn peer_has_same_ta_identity(peer: &[SequenceSample], key: &str) -> bool { + let Some(identity) = trust_anchor_identity(key) else { + return false; + }; + peer.iter().any(|sample| { + sample + .trust_anchors + .iter() + .filter_map(|peer_key| trust_anchor_identity(peer_key)) + .any(|peer_identity| peer_identity == identity) + }) +} + +fn trust_anchor_identity(key: &str) -> Option { + let parts = key.split('|').collect::>(); + if parts.len() != 4 { + return None; + } + Some(format!("{}|{}", parts[2], parts[3])) +} + +fn split_object_hash_key(key: &str) -> Option<(&str, &str)> { + key.rsplit_once('|') +} + +fn build_uri_timeline_samples( + left: &[SequenceSample], + right: &[SequenceSample], + adjusted: &AdjustedAnalysis, + limit: usize, +) -> Vec { + let mut uris = BTreeSet::new(); + for stats in adjusted.stats.values() { + for sample in &stats.samples { + if sample.event_type == "object_hash" + && let Some((uri, _)) = split_object_hash_key(&sample.key) + { + uris.insert(uri.to_string()); + } + if sample.event_type == "object_uri" { + uris.insert(sample.key.clone()); + } + } + } + uris.into_iter() + .take(limit) + .map(|uri| { + json!({ + "uri": uri, + "left": timeline_for_uri(left, &uri), + "right": timeline_for_uri(right, &uri), + }) + }) + .collect() +} + +fn timeline_for_uri(samples: &[SequenceSample], uri: &str) -> Vec { + samples + .iter() + .filter_map(|sample| { + sample.objects.get(uri).map(|hash| { + json!({ + "seq": sample.raw.seq, + "runId": sample.raw.run_id, + "validationTime": format_time(sample.validation_time), + "hash": hash, + }) + }) + }) + .collect() +} + +fn build_stable_object_groups( + adjusted: &AdjustedAnalysis, + left: &[SequenceSample], + right: &[SequenceSample], + limit: usize, +) -> Vec { + let mut groups: BTreeMap = BTreeMap::new(); + let Some(stats) = adjusted.stats.get("STABLE_OBJECT_SET_DIVERGENCE") else { + return Vec::new(); + }; + for sample in &stats.samples { + let physical_uri = physical_object_uri(sample); + let group_key = format!( + "{}|{}|{}|{}", + sample.source_side.as_str(), + sample.source_seq, + sample.source_run_id, + publication_point_prefix(&physical_uri) + ); + let group = groups + .entry(group_key) + .or_insert_with(|| StableObjectGroup::new(sample, &physical_uri)); + if group.source_cir_path.is_none() + && let Some(source_sample) = sample_by_side_seq_run( + left, + right, + sample.source_side, + sample.source_seq, + &sample.source_run_id, + ) + { + group.source_cir_path = Some(path_string(&source_sample.cir_path)); + } + group.add(sample, physical_uri); + } + groups + .into_values() + .take(limit) + .map(StableObjectGroup::to_json) + .collect() +} + +#[derive(Clone, Debug)] +struct StableObjectGroup { + source_side: Side, + source_seq: u32, + source_run_id: String, + source_cir_path: Option, + publication_point: String, + event_count: usize, + event_types: BTreeMap<&'static str, usize>, + physical_objects: BTreeMap, +} + +impl StableObjectGroup { + fn new(sample: &AdjustedRecord, physical_uri: &str) -> Self { + Self { + source_side: sample.source_side, + source_seq: sample.source_seq, + source_run_id: sample.source_run_id.clone(), + source_cir_path: None, + publication_point: publication_point_prefix(physical_uri), + event_count: 0, + event_types: BTreeMap::new(), + physical_objects: BTreeMap::new(), + } + } + + fn add(&mut self, sample: &AdjustedRecord, physical_uri: String) { + self.event_count += 1; + *self.event_types.entry(sample.event_type).or_default() += 1; + let object = self + .physical_objects + .entry(physical_uri.clone()) + .or_insert_with(|| StablePhysicalObject { + extension: object_extension(&physical_uri).to_string(), + uri: physical_uri, + event_types: BTreeSet::new(), + hashes: BTreeSet::new(), + }); + object.event_types.insert(sample.event_type); + if let Some(hash) = event_hash(sample) { + object.hashes.insert(hash.to_string()); + } + } + + fn to_json(self) -> Value { + json!({ + "sourceSide": self.source_side.as_str(), + "sourceSeq": self.source_seq, + "sourceRunId": self.source_run_id, + "sourceCirPath": self.source_cir_path, + "publicationPoint": self.publication_point, + "eventCount": self.event_count, + "eventTypes": self.event_types, + "physicalObjectCount": self.physical_objects.len(), + "physicalObjects": self.physical_objects.into_values().map(StablePhysicalObject::to_json).collect::>(), + }) + } +} + +#[derive(Clone, Debug)] +struct StablePhysicalObject { + uri: String, + extension: String, + event_types: BTreeSet<&'static str>, + hashes: BTreeSet, +} + +impl StablePhysicalObject { + fn to_json(self) -> Value { + json!({ + "uri": self.uri, + "extension": self.extension, + "eventTypes": self.event_types, + "hashes": self.hashes, + }) + } +} + +fn physical_object_uri(sample: &AdjustedRecord) -> String { + if sample.event_type == "object_hash" + && let Some((uri, _)) = split_object_hash_key(&sample.key) + { + return uri.to_string(); + } + sample.key.clone() +} + +fn event_hash(sample: &AdjustedRecord) -> Option<&str> { + if sample.event_type == "object_hash" { + split_object_hash_key(&sample.key).map(|(_, hash)| hash) + } else { + None + } +} + +fn publication_point_prefix(uri: &str) -> String { + uri.rsplit_once('/') + .map(|(prefix, _)| format!("{prefix}/")) + .unwrap_or_else(|| uri.to_string()) +} + +fn object_extension(uri: &str) -> &str { + uri.rsplit_once('.') + .map(|(_, extension)| extension) + .unwrap_or("") +} + +fn sample_by_side_seq_run<'a>( + left: &'a [SequenceSample], + right: &'a [SequenceSample], + side: Side, + seq: u32, + run_id: &str, +) -> Option<&'a SequenceSample> { + samples_for_side(side, left, right) + .iter() + .find(|sample| sample.raw.seq == seq && sample.raw.run_id == run_id) +} diff --git a/src/tools/sequence_triage_ccr_cir/analysis.rs b/src/tools/sequence_triage_ccr_cir/analysis.rs new file mode 100644 index 0000000..fa02859 --- /dev/null +++ b/src/tools/sequence_triage_ccr_cir/analysis.rs @@ -0,0 +1,364 @@ +use std::collections::BTreeSet; + +use super::args::Args; +use super::io::object_hash_key; +use super::model::{ + AnalysisResult, DiffEvent, EventOccurrence, SampleRecord, SequenceSample, Side, +}; + +pub(super) fn analyze_set( + result: &mut AnalysisResult, + event_type: &'static str, + left: &[SequenceSample], + right: &[SequenceSample], + extract: F, + resolved_class: &'static str, + persistent_class: &'static str, + args: &Args, +) where + F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, +{ + analyze_direction( + result, + event_type, + Side::Left, + left, + right, + &extract, + resolved_class, + persistent_class, + args, + ); + analyze_direction( + result, + event_type, + Side::Right, + right, + left, + &extract, + resolved_class, + persistent_class, + args, + ); +} + +fn analyze_direction( + result: &mut AnalysisResult, + event_type: &'static str, + source_side: Side, + source: &[SequenceSample], + peer: &[SequenceSample], + extract: &F, + resolved_class: &'static str, + persistent_class: &'static str, + args: &Args, +) where + F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, +{ + for sample in source { + let source_set = extract(sample); + for key in source_set { + if peer_sample_at_seq(peer, sample.raw.seq) + .is_some_and(|peer_sample| extract(peer_sample).contains(key)) + { + continue; + } + if let Some(matched) = find_future_match(peer, sample, key, extract, args) { + result.add( + resolved_class, + SampleRecord { + classification: resolved_class, + event_type, + key: key.clone(), + source_side, + source_seq: sample.raw.seq, + source_run_id: sample.raw.run_id.clone(), + matched_seq: Some(matched.seq), + matched_run_id: Some(matched.run_id), + note: format!( + "matched in {} sequence within alignment window", + matched.side.as_str() + ), + }, + args.sample_limit, + ); + } else { + result.add( + persistent_class, + SampleRecord { + classification: persistent_class, + event_type, + key: key.clone(), + source_side, + source_seq: sample.raw.seq, + source_run_id: sample.raw.run_id.clone(), + matched_seq: None, + matched_run_id: None, + note: "no matching event in peer sequence alignment window".to_string(), + }, + args.sample_limit, + ); + } + } + } +} + +pub(super) fn analyze_hash_rollover( + result: &mut AnalysisResult, + left: &[SequenceSample], + right: &[SequenceSample], + args: &Args, +) { + for (source_side, source, peer) in [(Side::Left, left, right), (Side::Right, right, left)] { + for sample in source { + for (uri, hash) in &sample.objects { + if peer_sample_at_seq(peer, sample.raw.seq) + .and_then(|peer_sample| peer_sample.objects.get(uri)) + .is_some_and(|peer_hash| peer_hash == hash) + { + continue; + } + if let Some(peer_sample) = peer_sample_at_seq(peer, sample.raw.seq) + && peer_sample.objects.contains_key(uri) + && find_future_hash_match(peer, sample, uri, hash, args).is_some() + { + let matched = + find_future_hash_match(peer, sample, uri, hash, args).expect("match"); + result.add( + "CONTENT_ROLLOVER_RESOLVED", + SampleRecord { + classification: "CONTENT_ROLLOVER_RESOLVED", + event_type: "object_content_rollover", + key: object_hash_key(uri, hash), + source_side, + source_seq: sample.raw.seq, + source_run_id: sample.raw.run_id.clone(), + matched_seq: Some(matched.seq), + matched_run_id: Some(matched.run_id), + note: "same URI hash appeared in peer sequence later".to_string(), + }, + args.sample_limit, + ); + } + } + } + } +} + +pub(super) fn collect_persistent_events( + left: &[SequenceSample], + right: &[SequenceSample], + args: &Args, +) -> Vec { + let mut events = Vec::new(); + collect_persistent_set( + &mut events, + "object_uri", + "PERSISTENT_OBJECT_SET_DIVERGENCE", + left, + right, + |sample| &sample.object_uris, + args, + ); + collect_persistent_set( + &mut events, + "object_hash", + "PERSISTENT_CONTENT_DIVERGENCE", + left, + right, + |sample| &sample.object_hashes, + args, + ); + collect_persistent_set( + &mut events, + "reject_uri", + "PERSISTENT_REJECT_DIVERGENCE", + left, + right, + |sample| &sample.rejects, + args, + ); + collect_persistent_set( + &mut events, + "trust_anchor", + "PERSISTENT_TA_DIFFERENCE", + left, + right, + |sample| &sample.trust_anchors, + args, + ); + collect_persistent_set( + &mut events, + "vrp_output", + "PERSISTENT_OUTPUT_DIVERGENCE", + left, + right, + |sample| &sample.vrps, + args, + ); + collect_persistent_set( + &mut events, + "vap_output", + "PERSISTENT_OUTPUT_DIVERGENCE", + left, + right, + |sample| &sample.vaps, + args, + ); + events +} + +fn collect_persistent_set( + events: &mut Vec, + event_type: &'static str, + raw_class: &'static str, + left: &[SequenceSample], + right: &[SequenceSample], + extract: F, + args: &Args, +) where + F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, +{ + collect_persistent_direction( + events, + event_type, + raw_class, + Side::Left, + left, + right, + &extract, + args, + ); + collect_persistent_direction( + events, + event_type, + raw_class, + Side::Right, + right, + left, + &extract, + args, + ); +} + +fn collect_persistent_direction( + events: &mut Vec, + event_type: &'static str, + raw_class: &'static str, + source_side: Side, + source: &[SequenceSample], + peer: &[SequenceSample], + extract: &F, + args: &Args, +) where + F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, +{ + for sample in source { + for key in extract(sample) { + if peer_sample_at_seq(peer, sample.raw.seq) + .is_some_and(|peer_sample| extract(peer_sample).contains(key)) + { + continue; + } + if find_future_match(peer, sample, key, extract, args).is_some() { + continue; + } + events.push(DiffEvent { + event_type, + raw_class, + key: key.clone(), + source_side, + source_seq: sample.raw.seq, + source_run_id: sample.raw.run_id.clone(), + }); + } + } +} + +impl AnalysisResult { + fn add(&mut self, class: &'static str, record: SampleRecord, sample_limit: usize) { + let stats = self.stats.entry(class).or_default(); + stats.total += 1; + if stats.samples.len() < sample_limit { + stats.samples.push(record); + } + } +} + +pub(super) fn peer_sample_at_seq(peer: &[SequenceSample], seq: u32) -> Option<&SequenceSample> { + peer.iter().find(|sample| sample.raw.seq == seq) +} + +fn find_future_match( + peer: &[SequenceSample], + source: &SequenceSample, + key: &str, + extract: &F, + args: &Args, +) -> Option +where + F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, +{ + peer.iter() + .filter(|candidate| is_in_alignment_window(source, candidate, args)) + .find(|candidate| extract(candidate).contains(key)) + .map(|candidate| { + occurrence( + candidate, + if candidate.raw.side.as_deref() == Some("left") { + Side::Left + } else { + Side::Right + }, + ) + }) +} + +fn find_future_hash_match( + peer: &[SequenceSample], + source: &SequenceSample, + uri: &str, + hash: &str, + args: &Args, +) -> Option { + peer.iter() + .filter(|candidate| is_in_alignment_window(source, candidate, args)) + .find(|candidate| { + candidate + .objects + .get(uri) + .is_some_and(|peer_hash| peer_hash == hash) + }) + .map(|candidate| { + occurrence( + candidate, + if candidate.raw.side.as_deref() == Some("left") { + Side::Left + } else { + Side::Right + }, + ) + }) +} + +fn is_in_alignment_window( + source: &SequenceSample, + candidate: &SequenceSample, + args: &Args, +) -> bool { + if candidate.raw.seq < source.raw.seq { + return false; + } + let run_delta = candidate.raw.seq.saturating_sub(source.raw.seq); + let time_delta = candidate.validation_time - source.validation_time; + let secs = time_delta.whole_seconds().abs(); + run_delta <= args.align_window_runs || secs <= args.align_window_secs +} + +fn occurrence(sample: &SequenceSample, side: Side) -> EventOccurrence { + EventOccurrence { + side, + seq: sample.raw.seq, + run_id: sample.raw.run_id.clone(), + } +} diff --git a/src/tools/sequence_triage_ccr_cir/args.rs b/src/tools/sequence_triage_ccr_cir/args.rs new file mode 100644 index 0000000..15aa2cd --- /dev/null +++ b/src/tools/sequence_triage_ccr_cir/args.rs @@ -0,0 +1,119 @@ +use std::path::PathBuf; + +#[derive(Debug, PartialEq, Eq)] +pub(super) struct Args { + pub(super) left_sequence: PathBuf, + pub(super) right_sequence: PathBuf, + pub(super) out_dir: PathBuf, + pub(super) align_window_runs: u32, + pub(super) align_window_secs: i64, + pub(super) sample_limit: usize, + pub(super) warmup_samples: usize, + pub(super) cooldown_samples: usize, + pub(super) timeline_sample_limit: usize, +} + +pub(super) fn usage() -> &'static str { + "Usage: sequence_triage_ccr_cir --left-sequence --right-sequence --out-dir [--align-window-runs ] [--align-window-secs ] [--sample-limit ] [--warmup-samples ] [--cooldown-samples ] [--timeline-sample-limit ]" +} + +pub(super) fn parse_args(argv: &[String]) -> Result { + let mut left_sequence = None; + let mut right_sequence = None; + let mut out_dir = None; + let mut align_window_runs = 2u32; + let mut align_window_secs = 1800i64; + let mut sample_limit = 200usize; + let mut warmup_samples = 1usize; + let mut cooldown_samples = 1usize; + let mut timeline_sample_limit = 0usize; + let mut index = 1usize; + while index < argv.len() { + match argv[index].as_str() { + "--left-sequence" => { + index += 1; + left_sequence = Some(PathBuf::from( + argv.get(index).ok_or("--left-sequence requires a value")?, + )); + } + "--right-sequence" => { + index += 1; + right_sequence = Some(PathBuf::from( + argv.get(index).ok_or("--right-sequence requires a value")?, + )); + } + "--out-dir" => { + index += 1; + out_dir = Some(PathBuf::from( + argv.get(index).ok_or("--out-dir requires a value")?, + )); + } + "--align-window-runs" => { + index += 1; + let value = argv + .get(index) + .ok_or("--align-window-runs requires a value")?; + align_window_runs = value + .parse::() + .map_err(|_| format!("invalid --align-window-runs: {value}"))?; + } + "--align-window-secs" => { + index += 1; + let value = argv + .get(index) + .ok_or("--align-window-secs requires a value")?; + align_window_secs = value + .parse::() + .map_err(|_| format!("invalid --align-window-secs: {value}"))?; + } + "--sample-limit" => { + index += 1; + let value = argv.get(index).ok_or("--sample-limit requires a value")?; + sample_limit = value + .parse::() + .map_err(|_| format!("invalid --sample-limit: {value}"))?; + } + "--warmup-samples" => { + index += 1; + let value = argv.get(index).ok_or("--warmup-samples requires a value")?; + warmup_samples = value + .parse::() + .map_err(|_| format!("invalid --warmup-samples: {value}"))?; + } + "--cooldown-samples" => { + index += 1; + let value = argv + .get(index) + .ok_or("--cooldown-samples requires a value")?; + cooldown_samples = value + .parse::() + .map_err(|_| format!("invalid --cooldown-samples: {value}"))?; + } + "--timeline-sample-limit" => { + index += 1; + let value = argv + .get(index) + .ok_or("--timeline-sample-limit requires a value")?; + timeline_sample_limit = value + .parse::() + .map_err(|_| format!("invalid --timeline-sample-limit: {value}"))?; + } + "-h" | "--help" => return Err(usage().to_string()), + other => return Err(format!("unknown argument: {other}\n{}", usage())), + } + index += 1; + } + Ok(Args { + left_sequence: left_sequence + .ok_or_else(|| format!("--left-sequence is required\n{}", usage()))?, + right_sequence: right_sequence + .ok_or_else(|| format!("--right-sequence is required\n{}", usage()))?, + out_dir: out_dir.ok_or_else(|| format!("--out-dir is required\n{}", usage()))?, + align_window_runs, + align_window_secs, + sample_limit, + warmup_samples, + cooldown_samples, + timeline_sample_limit, + }) +} diff --git a/src/tools/sequence_triage_ccr_cir/io.rs b/src/tools/sequence_triage_ccr_cir/io.rs new file mode 100644 index 0000000..15544ab --- /dev/null +++ b/src/tools/sequence_triage_ccr_cir/io.rs @@ -0,0 +1,33 @@ +use std::path::{Path, PathBuf}; + +use time::OffsetDateTime; +use time::format_description::well_known::Rfc3339; + +pub(super) fn read_file(path: &Path) -> Result, String> { + std::fs::read(path).map_err(|e| format!("read file failed: {}: {e}", path.display())) +} + +pub(super) fn resolve_path(base_dir: &Path, path: &Path) -> PathBuf { + if path.is_absolute() { + path.to_path_buf() + } else { + base_dir.join(path) + } +} + +pub(super) fn parse_rfc3339(value: &str) -> Result { + OffsetDateTime::parse(value, &Rfc3339) + .map_err(|e| format!("parse RFC3339 failed: {value}: {e}")) +} + +pub(super) fn format_time(value: OffsetDateTime) -> String { + value.format(&Rfc3339).unwrap_or_else(|_| value.to_string()) +} + +pub(super) fn path_string(path: &Path) -> String { + path.to_string_lossy().into_owned() +} + +pub(super) fn object_hash_key(uri: &str, hash: &str) -> String { + format!("{uri}|{hash}") +} diff --git a/src/tools/sequence_triage_ccr_cir/loader.rs b/src/tools/sequence_triage_ccr_cir/loader.rs new file mode 100644 index 0000000..909cda9 --- /dev/null +++ b/src/tools/sequence_triage_ccr_cir/loader.rs @@ -0,0 +1,144 @@ +use std::collections::{BTreeMap, BTreeSet}; +use std::path::Path; + +use crate::ccr::{decode_ccr_compare_views, decode_content_info}; +use crate::cir::decode_cir; + +use super::io::{object_hash_key, parse_rfc3339, read_file, resolve_path}; +use super::model::{SequenceItemRaw, SequenceSample, Side}; + +pub(super) fn load_sequence(path: &Path, side: Side) -> Result, String> { + let base_dir = path.parent().unwrap_or_else(|| Path::new(".")); + let text = std::fs::read_to_string(path) + .map_err(|e| format!("read sequence failed: {}: {e}", path.display()))?; + let mut samples = Vec::new(); + let mut seen_seq = BTreeSet::new(); + for (line_index, line) in text.lines().enumerate() { + let line = line.trim(); + if line.is_empty() { + continue; + } + let raw: SequenceItemRaw = serde_json::from_str(line).map_err(|e| { + format!( + "parse sequence JSONL failed: {}:{}: {e}", + path.display(), + line_index + 1 + ) + })?; + if raw.schema_version.unwrap_or(1) != 1 { + return Err(format!( + "unsupported sequence item schemaVersion in {}:{}", + path.display(), + line_index + 1 + )); + } + if !seen_seq.insert(raw.seq) { + return Err(format!("duplicate seq {} in {}", raw.seq, path.display())); + } + if let Some(status) = &raw.status + && status != "success" + { + return Err(format!( + "sequence sample {} has non-success status: {status}", + raw.run_id + )); + } + let cir_path = resolve_path(base_dir, &raw.cir_path); + let ccr_path = resolve_path(base_dir, &raw.ccr_path); + let cir = decode_cir(&read_file(&cir_path)?).map_err(|e| { + format!( + "decode CIR failed for sample {} ({}): {e}", + raw.run_id, + cir_path.display() + ) + })?; + let ccr = decode_content_info(&read_file(&ccr_path)?).map_err(|e| { + format!( + "decode CCR failed for sample {} ({}): {e}", + raw.run_id, + ccr_path.display() + ) + })?; + let validation_time = raw + .validation_time + .as_deref() + .map(parse_rfc3339) + .transpose()? + .unwrap_or(cir.validation_time); + let objects = cir + .objects + .iter() + .map(|item| (item.rsync_uri.clone(), hex::encode(&item.sha256))) + .collect::>(); + let object_uris = objects.keys().cloned().collect::>(); + let object_hashes = objects + .iter() + .map(|(uri, hash)| object_hash_key(uri, hash)) + .collect::>(); + let rejects = cir + .rejected_objects + .iter() + .map(|item| item.object_uri.clone()) + .collect::>(); + let trust_anchors = cir + .trust_anchors + .iter() + .map(|item| { + format!( + "{}|{}|{}|{}", + item.ta_rsync_uri, + item.tal_uri, + hex::encode(crate::cir::sha256(&item.tal_bytes)), + hex::encode(&item.ta_certificate_sha256) + ) + }) + .collect::>(); + let (vrps, vaps) = decode_ccr_compare_views(&ccr).map_err(|e| { + format!( + "decode CCR compare views failed for sample {} ({}): {e}", + raw.run_id, + ccr_path.display() + ) + })?; + let vrps = vrps + .into_iter() + .map(|row| format!("{}|{}|{}", row.asn, row.ip_prefix, row.max_length)) + .collect::>(); + let vaps = vaps + .into_iter() + .map(|row| format!("{}|{}", row.customer_asn, row.providers)) + .collect::>(); + samples.push(SequenceSample { + raw, + validation_time, + ccr_path, + cir_path, + objects, + object_uris, + object_hashes, + rejects, + trust_anchors, + vrps, + vaps, + }); + } + samples.sort_by_key(|sample| sample.raw.seq); + for pair in samples.windows(2) { + if pair[0].raw.seq >= pair[1].raw.seq { + return Err("sequence must be sorted by increasing seq".into()); + } + } + if samples.iter().any(|sample| { + sample + .raw + .side + .as_deref() + .is_some_and(|item| item != side.as_str()) + }) { + return Err(format!( + "sequence side field does not match expected side: {}", + side.as_str() + )); + } + Ok(samples) +} diff --git a/src/tools/sequence_triage_ccr_cir/model.rs b/src/tools/sequence_triage_ccr_cir/model.rs new file mode 100644 index 0000000..b88b6bb --- /dev/null +++ b/src/tools/sequence_triage_ccr_cir/model.rs @@ -0,0 +1,173 @@ +use std::collections::{BTreeMap, BTreeSet}; +use std::path::PathBuf; + +use serde::Deserialize; +use serde_json::Value; +use time::OffsetDateTime; + +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub(super) struct SequenceItemRaw { + pub(super) schema_version: Option, + pub(super) rp_id: String, + pub(super) side: Option, + pub(super) seq: u32, + pub(super) run_id: String, + pub(super) sync_mode: Option, + pub(super) status: Option, + pub(super) start_time: Option, + pub(super) finish_time: Option, + pub(super) validation_time: Option, + pub(super) ccr_path: PathBuf, + pub(super) cir_path: PathBuf, + pub(super) ccr_sha256: Option, + pub(super) cir_sha256: Option, + pub(super) wall_ms: Option, + pub(super) max_rss_kb: Option, + pub(super) vrps: Option, + pub(super) vaps: Option, +} + +#[derive(Clone, Debug)] +pub(super) struct SequenceSample { + pub(super) raw: SequenceItemRaw, + pub(super) validation_time: OffsetDateTime, + pub(super) ccr_path: PathBuf, + pub(super) cir_path: PathBuf, + pub(super) objects: BTreeMap, + pub(super) object_uris: BTreeSet, + pub(super) object_hashes: BTreeSet, + pub(super) rejects: BTreeSet, + pub(super) trust_anchors: BTreeSet, + pub(super) vrps: BTreeSet, + pub(super) vaps: BTreeSet, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(super) enum Side { + Left, + Right, +} + +impl Side { + pub(super) fn as_str(self) -> &'static str { + match self { + Side::Left => "left", + Side::Right => "right", + } + } +} + +#[derive(Clone, Debug)] +pub(super) struct EventOccurrence { + pub(super) side: Side, + pub(super) seq: u32, + pub(super) run_id: String, +} + +#[derive(Clone, Debug)] +pub(super) struct SampleRecord { + pub(super) classification: &'static str, + pub(super) event_type: &'static str, + pub(super) key: String, + pub(super) source_side: Side, + pub(super) source_seq: u32, + pub(super) source_run_id: String, + pub(super) matched_seq: Option, + pub(super) matched_run_id: Option, + pub(super) note: String, +} + +#[derive(Clone, Debug, Default)] +pub(super) struct ClassStats { + pub(super) total: usize, + pub(super) samples: Vec, +} + +#[derive(Clone, Debug, Default)] +pub(super) struct AnalysisResult { + pub(super) stats: BTreeMap<&'static str, ClassStats>, +} + +#[derive(Clone, Debug)] +pub(super) struct DiffEvent { + pub(super) event_type: &'static str, + pub(super) raw_class: &'static str, + pub(super) key: String, + pub(super) source_side: Side, + pub(super) source_seq: u32, + pub(super) source_run_id: String, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(super) enum EdgePosition { + Leading, + Stable, + Trailing, +} + +#[derive(Clone, Debug)] +pub(super) struct AdjustedRecord { + pub(super) classification: &'static str, + pub(super) event_type: &'static str, + pub(super) key: String, + pub(super) source_side: Side, + pub(super) source_seq: u32, + pub(super) source_run_id: String, + pub(super) note: String, +} + +#[derive(Clone, Debug, Default)] +pub(super) struct AdjustedClassStats { + pub(super) total: usize, + pub(super) unique_keys: BTreeSet, + pub(super) samples: Vec, +} + +#[derive(Clone, Debug, Default)] +pub(super) struct AdjustedAnalysis { + pub(super) raw_persistent_occurrences: usize, + pub(super) raw_persistent_unique_keys: usize, + pub(super) edge_filtered_occurrences: usize, + pub(super) edge_filtered_unique_keys: usize, + pub(super) adjusted_stable_occurrences: usize, + pub(super) adjusted_stable_unique_keys: usize, + pub(super) stats: BTreeMap<&'static str, AdjustedClassStats>, + pub(super) uri_timeline_samples: Vec, + pub(super) stable_object_groups: Vec, +} + +#[derive(Clone, Debug)] +pub(super) struct SandwichRecord { + pub(super) classification: &'static str, + pub(super) set_type: &'static str, + pub(super) key: String, + pub(super) source_side: Side, + pub(super) source_start_seq: u32, + pub(super) source_start_run_id: String, + pub(super) source_end_seq: u32, + pub(super) source_end_run_id: String, + pub(super) peer_seq: u32, + pub(super) peer_run_id: String, + pub(super) source_value: Option, + pub(super) peer_value: Option, + pub(super) source_start_time: String, + pub(super) peer_time: String, + pub(super) source_end_time: String, + pub(super) note: String, +} + +#[derive(Clone, Debug, Default)] +pub(super) struct SandwichClassStats { + pub(super) total: usize, + pub(super) unique_keys: BTreeSet, + pub(super) samples: Vec, +} + +#[derive(Clone, Debug, Default)] +pub(super) struct SandwichAnalysis { + pub(super) total_occurrences: usize, + pub(super) unique_keys: BTreeSet, + pub(super) by_set_type: BTreeMap<&'static str, usize>, + pub(super) stats: BTreeMap<&'static str, SandwichClassStats>, +} diff --git a/src/tools/sequence_triage_ccr_cir/output.rs b/src/tools/sequence_triage_ccr_cir/output.rs new file mode 100644 index 0000000..c311d77 --- /dev/null +++ b/src/tools/sequence_triage_ccr_cir/output.rs @@ -0,0 +1,408 @@ +use std::collections::BTreeSet; +use std::path::Path; + +use serde_json::{Value, json}; + +use super::args::Args; +use super::io::{format_time, path_string}; +use super::model::{ + AdjustedAnalysis, AdjustedRecord, AnalysisResult, SampleRecord, SandwichAnalysis, + SandwichRecord, SequenceSample, +}; + +pub(super) fn build_output( + args: &Args, + left: &[SequenceSample], + right: &[SequenceSample], + result: &AnalysisResult, + adjusted: &AdjustedAnalysis, + sandwich: &SandwichAnalysis, +) -> Value { + let classifications = result + .stats + .iter() + .map(|(class, stats)| { + json!({ + "classification": class, + "count": stats.total, + "samples": stats.samples.iter().map(sample_to_json).collect::>(), + }) + }) + .collect::>(); + let adjusted_classifications = adjusted + .stats + .iter() + .map(|(class, stats)| { + json!({ + "classification": class, + "occurrences": stats.total, + "uniqueKeys": stats.unique_keys.len(), + "samples": stats.samples.iter().map(adjusted_sample_to_json).collect::>(), + }) + }) + .collect::>(); + let sandwich_classifications = sandwich + .stats + .iter() + .map(|(class, stats)| { + json!({ + "classification": class, + "occurrences": stats.total, + "uniqueKeys": stats.unique_keys.len(), + "samples": stats.samples.iter().map(sandwich_sample_to_json).collect::>(), + }) + }) + .collect::>(); + json!({ + "schemaVersion": 1, + "generatedBy": "sequence_triage_ccr_cir", + "inputContract": "left-right-sequence-jsonl-with-ccr-cir-artifacts", + "parameters": { + "leftSequence": path_string(&args.left_sequence), + "rightSequence": path_string(&args.right_sequence), + "alignWindowRuns": args.align_window_runs, + "alignWindowSecs": args.align_window_secs, + "sampleLimit": args.sample_limit, + "warmupSamples": args.warmup_samples, + "cooldownSamples": args.cooldown_samples, + "timelineSampleLimit": if args.timeline_sample_limit == 0 { args.sample_limit } else { args.timeline_sample_limit }, + }, + "left": sequence_summary(left), + "right": sequence_summary(right), + "classificationCounts": classifications, + "totals": { + "resolvedTemporalLike": count_class(result, "TEMPORAL_LAG_RESOLVED") + count_class(result, "CONTENT_ROLLOVER_RESOLVED"), + "persistent": count_prefix(result, "PERSISTENT_"), + "unclassified": count_class(result, "UNCLASSIFIED_INSUFFICIENT_WINDOW"), + }, + "adjusted": { + "warmupSamples": args.warmup_samples, + "cooldownSamples": args.cooldown_samples, + "rawPersistent": { + "occurrences": adjusted.raw_persistent_occurrences, + "uniqueKeys": adjusted.raw_persistent_unique_keys, + }, + "edgeFilteredPersistent": { + "occurrences": adjusted.edge_filtered_occurrences, + "uniqueKeys": adjusted.edge_filtered_unique_keys, + }, + "adjustedStablePersistent": { + "occurrences": adjusted.adjusted_stable_occurrences, + "uniqueKeys": adjusted.adjusted_stable_unique_keys, + }, + "classificationCounts": adjusted_classifications, + "uriTimelineSamples": adjusted.uri_timeline_samples, + "stableObjectGroups": adjusted.stable_object_groups, + "interpretation": { + "rawPersistentMeaning": "raw persistent keeps the original #043 single-event matching semantics.", + "edgeFilteredMeaning": "edge-filtered persistent keeps only non-warmup and non-cooldown source occurrences.", + "adjustedStableMeaning": "adjusted stable persistent keeps non-edge findings after URI-level rollover and TA projection filtering.", + "stableObjectGroupsMeaning": "STABLE_OBJECT_SET_DIVERGENCE events are additionally collapsed by source CIR, publication point, and physical object URI so object_uri/object_hash duplicate event views do not inflate physical-object counts.", + } + }, + "sandwich": { + "strictTimeWindow": true, + "method": "For each side, use two adjacent source samples as a stable interval. If source_start.time < peer.time < source_end.time and the source value is identical at both interval endpoints, the peer sample is expected to contain the same value.", + "totals": { + "occurrences": sandwich.total_occurrences, + "uniqueKeys": sandwich.unique_keys.len(), + }, + "bySetType": sandwich.by_set_type, + "classificationCounts": sandwich_classifications, + "interpretation": { + "missingStableObject": "The source side proves a URI/hash is stable across an interval that contains the peer sample, but the peer sample has no such URI.", + "hashMismatchStableObject": "The source side proves a URI/hash is stable across an interval that contains the peer sample, but the peer sample has the same URI with a different hash.", + "missingStableReject": "The source side consistently rejects the URI across an interval that contains the peer sample, but the peer sample does not reject it.", + "missingStableOutput": "The source side consistently outputs a VRP/VAP key across an interval that contains the peer sample, but the peer sample does not output it." + } + }, + "interpretation": { + "temporalResolvedMeaning": "Event appeared only on one side at one sample but aligned in the peer sequence later.", + "persistentMeaning": "Event did not align within the configured run/time window; inspect as a candidate RP behavior difference or persistent sync/input difference.", + "limits": [ + "Sequence triage only reads sequence JSONL plus referenced CCR/CIR files.", + "It does not read report.json, logs, repo-bytes DB, cache, mirror, or raw objects for root cause proof.", + "Resolved temporal findings reduce false positives but do not prove the exact external repository update time." + ] + } + }) +} + +fn sequence_summary(samples: &[SequenceSample]) -> Value { + json!({ + "sampleCount": samples.len(), + "rpIds": samples.iter().map(|sample| sample.raw.rp_id.clone()).collect::>(), + "firstSeq": samples.first().map(|sample| sample.raw.seq), + "lastSeq": samples.last().map(|sample| sample.raw.seq), + "firstValidationTime": samples.first().map(|sample| format_time(sample.validation_time)), + "lastValidationTime": samples.last().map(|sample| format_time(sample.validation_time)), + "samples": samples.iter().map(|sample| json!({ + "seq": sample.raw.seq, + "runId": sample.raw.run_id, + "syncMode": sample.raw.sync_mode, + "startTime": sample.raw.start_time, + "finishTime": sample.raw.finish_time, + "validationTime": format_time(sample.validation_time), + "status": sample.raw.status, + "ccrPath": path_string(&sample.ccr_path), + "cirPath": path_string(&sample.cir_path), + "ccrSha256": sample.raw.ccr_sha256, + "cirSha256": sample.raw.cir_sha256, + "wallMs": sample.raw.wall_ms, + "maxRssKb": sample.raw.max_rss_kb, + "vrps": sample.raw.vrps.or(Some(sample.vrps.len() as u64)), + "vaps": sample.raw.vaps.or(Some(sample.vaps.len() as u64)), + "objectCount": sample.object_uris.len(), + "rejectCount": sample.rejects.len(), + "trustAnchorCount": sample.trust_anchors.len(), + })).collect::>(), + }) +} + +fn sample_to_json(sample: &SampleRecord) -> Value { + json!({ + "classification": sample.classification, + "eventType": sample.event_type, + "key": sample.key, + "sourceSide": sample.source_side.as_str(), + "sourceSeq": sample.source_seq, + "sourceRunId": sample.source_run_id, + "matchedSeq": sample.matched_seq, + "matchedRunId": sample.matched_run_id, + "note": sample.note, + }) +} + +fn adjusted_sample_to_json(sample: &AdjustedRecord) -> Value { + json!({ + "classification": sample.classification, + "eventType": sample.event_type, + "key": sample.key, + "sourceSide": sample.source_side.as_str(), + "sourceSeq": sample.source_seq, + "sourceRunId": sample.source_run_id, + "note": sample.note, + }) +} + +fn sandwich_sample_to_json(sample: &SandwichRecord) -> Value { + json!({ + "classification": sample.classification, + "setType": sample.set_type, + "key": sample.key, + "sourceSide": sample.source_side.as_str(), + "sourceStartSeq": sample.source_start_seq, + "sourceStartRunId": sample.source_start_run_id, + "sourceEndSeq": sample.source_end_seq, + "sourceEndRunId": sample.source_end_run_id, + "peerSeq": sample.peer_seq, + "peerRunId": sample.peer_run_id, + "sourceValue": sample.source_value, + "peerValue": sample.peer_value, + "sourceStartTime": sample.source_start_time, + "peerTime": sample.peer_time, + "sourceEndTime": sample.source_end_time, + "note": sample.note, + }) +} + +fn count_class(result: &AnalysisResult, class: &'static str) -> usize { + result + .stats + .get(class) + .map(|stats| stats.total) + .unwrap_or(0) +} + +fn count_prefix(result: &AnalysisResult, prefix: &str) -> usize { + result + .stats + .iter() + .filter(|(class, _)| class.starts_with(prefix)) + .map(|(_, stats)| stats.total) + .sum() +} + +pub(super) fn write_json(path: &Path, value: &Value) -> Result<(), String> { + std::fs::write( + path, + serde_json::to_string_pretty(value).map_err(|e| e.to_string())? + "\n", + ) + .map_err(|e| format!("write JSON failed: {}: {e}", path.display())) +} + +pub(super) fn write_markdown(path: &Path, output: &Value) -> Result<(), String> { + let mut lines = vec![ + "# CCR/CIR Sequence Triage Summary".to_string(), + "".to_string(), + format!( + "- `generatedBy`: `{}`", + output["generatedBy"].as_str().unwrap_or("") + ), + format!( + "- `leftSamples`: `{}`", + output["left"]["sampleCount"].as_u64().unwrap_or(0) + ), + format!( + "- `rightSamples`: `{}`", + output["right"]["sampleCount"].as_u64().unwrap_or(0) + ), + format!( + "- `alignWindowRuns`: `{}`", + output["parameters"]["alignWindowRuns"] + .as_u64() + .unwrap_or(0) + ), + format!( + "- `alignWindowSecs`: `{}`", + output["parameters"]["alignWindowSecs"] + .as_i64() + .unwrap_or(0) + ), + format!( + "- `warmupSamples`: `{}`", + output["adjusted"]["warmupSamples"].as_u64().unwrap_or(0) + ), + format!( + "- `cooldownSamples`: `{}`", + output["adjusted"]["cooldownSamples"].as_u64().unwrap_or(0) + ), + "".to_string(), + "## Classification Counts".to_string(), + "".to_string(), + "| Classification | Count |".to_string(), + "|---|---:|".to_string(), + ]; + if let Some(classes) = output["classificationCounts"].as_array() { + for item in classes { + lines.push(format!( + "| `{}` | {} |", + item["classification"].as_str().unwrap_or(""), + item["count"].as_u64().unwrap_or(0) + )); + } + } + lines.extend([ + "".to_string(), + "## Adjusted Boundary / Rollover Classification".to_string(), + "".to_string(), + format!( + "- `rawPersistent`: `{}` occurrences / `{}` unique keys", + output["adjusted"]["rawPersistent"]["occurrences"] + .as_u64() + .unwrap_or(0), + output["adjusted"]["rawPersistent"]["uniqueKeys"] + .as_u64() + .unwrap_or(0) + ), + format!( + "- `edgeFilteredPersistent`: `{}` occurrences / `{}` unique keys", + output["adjusted"]["edgeFilteredPersistent"]["occurrences"] + .as_u64() + .unwrap_or(0), + output["adjusted"]["edgeFilteredPersistent"]["uniqueKeys"] + .as_u64() + .unwrap_or(0) + ), + format!( + "- `adjustedStablePersistent`: `{}` occurrences / `{}` unique keys", + output["adjusted"]["adjustedStablePersistent"]["occurrences"] + .as_u64() + .unwrap_or(0), + output["adjusted"]["adjustedStablePersistent"]["uniqueKeys"] + .as_u64() + .unwrap_or(0) + ), + "".to_string(), + "| Adjusted Classification | Occurrences | Unique Keys |".to_string(), + "|---|---:|---:|".to_string(), + ]); + if let Some(classes) = output["adjusted"]["classificationCounts"].as_array() { + for item in classes { + lines.push(format!( + "| `{}` | {} | {} |", + item["classification"].as_str().unwrap_or(""), + item["occurrences"].as_u64().unwrap_or(0), + item["uniqueKeys"].as_u64().unwrap_or(0) + )); + } + } + if let Some(groups) = output["adjusted"]["stableObjectGroups"].as_array() + && !groups.is_empty() + { + lines.extend([ + "".to_string(), + "## Stable Object Groups".to_string(), + "".to_string(), + "| Source | CIR | Publication Point | Events | Physical Objects |".to_string(), + "|---|---|---|---:|---:|".to_string(), + ]); + for group in groups { + lines.push(format!( + "| `{}/seq{}/{}` | `{}` | `{}` | {} | {} |", + group["sourceSide"].as_str().unwrap_or(""), + group["sourceSeq"].as_u64().unwrap_or(0), + group["sourceRunId"].as_str().unwrap_or(""), + group["sourceCirPath"].as_str().unwrap_or(""), + group["publicationPoint"].as_str().unwrap_or(""), + group["eventCount"].as_u64().unwrap_or(0), + group["physicalObjectCount"].as_u64().unwrap_or(0), + )); + } + } + lines.extend([ + "".to_string(), + "## Sandwich Anomaly Check".to_string(), + "".to_string(), + format!( + "- `occurrences`: `{}`", + output["sandwich"]["totals"]["occurrences"] + .as_u64() + .unwrap_or(0) + ), + format!( + "- `uniqueKeys`: `{}`", + output["sandwich"]["totals"]["uniqueKeys"] + .as_u64() + .unwrap_or(0) + ), + "- Rule: source side has two adjacent stable samples, and peer sample timestamp is strictly between them.".to_string(), + "".to_string(), + "| Sandwich Classification | Occurrences | Unique Keys |".to_string(), + "|---|---:|---:|".to_string(), + ]); + if let Some(classes) = output["sandwich"]["classificationCounts"].as_array() { + for item in classes { + lines.push(format!( + "| `{}` | {} | {} |", + item["classification"].as_str().unwrap_or(""), + item["occurrences"].as_u64().unwrap_or(0), + item["uniqueKeys"].as_u64().unwrap_or(0) + )); + } + } + lines.extend([ + "".to_string(), + "## Interpretation".to_string(), + "".to_string(), + "- `TEMPORAL_LAG_RESOLVED` / `CONTENT_ROLLOVER_RESOLVED` 表示差异在后续采样中对齐,优先视为采样时刻或仓库滚动窗口差异。".to_string(), + "- `PERSISTENT_*` 表示配置窗口内仍未对齐,才是后续人工排查的高价值候选。".to_string(), + "- `EDGE_*` 表示差异落在序列首尾,缺少前置或后续观察,不能直接当作实现差异。".to_string(), + "- `STABLE_*` 表示经过首尾边界过滤和 URI-level rollover 过滤后仍存在的稳定候选。".to_string(), + ]); + std::fs::write(path, lines.join("\n") + "\n") + .map_err(|e| format!("write markdown failed: {}: {e}", path.display())) +} + +pub(super) fn write_samples_jsonl(path: &Path, result: &AnalysisResult) -> Result<(), String> { + let mut body = String::new(); + for stats in result.stats.values() { + for sample in &stats.samples { + body.push_str( + &serde_json::to_string(&sample_to_json(sample)).map_err(|e| e.to_string())?, + ); + body.push('\n'); + } + } + std::fs::write(path, body).map_err(|e| format!("write samples failed: {}: {e}", path.display())) +} diff --git a/src/tools/sequence_triage_ccr_cir/sandwich.rs b/src/tools/sequence_triage_ccr_cir/sandwich.rs new file mode 100644 index 0000000..de5a26c --- /dev/null +++ b/src/tools/sequence_triage_ccr_cir/sandwich.rs @@ -0,0 +1,261 @@ +use std::collections::BTreeSet; + +use super::args::Args; +use super::io::format_time; +use super::model::{SandwichAnalysis, SandwichRecord, SequenceSample, Side}; + +pub(super) fn build_sandwich_analysis( + args: &Args, + left: &[SequenceSample], + right: &[SequenceSample], +) -> SandwichAnalysis { + let mut analysis = SandwichAnalysis::default(); + analyze_sandwich_objects(&mut analysis, Side::Left, left, right, args); + analyze_sandwich_objects(&mut analysis, Side::Right, right, left, args); + analyze_sandwich_sets( + &mut analysis, + "reject_uri", + "PEER_MISSING_STABLE_REJECT", + Side::Left, + left, + right, + |sample| &sample.rejects, + args, + ); + analyze_sandwich_sets( + &mut analysis, + "reject_uri", + "PEER_MISSING_STABLE_REJECT", + Side::Right, + right, + left, + |sample| &sample.rejects, + args, + ); + analyze_sandwich_sets( + &mut analysis, + "vrp_output", + "PEER_MISSING_STABLE_OUTPUT", + Side::Left, + left, + right, + |sample| &sample.vrps, + args, + ); + analyze_sandwich_sets( + &mut analysis, + "vrp_output", + "PEER_MISSING_STABLE_OUTPUT", + Side::Right, + right, + left, + |sample| &sample.vrps, + args, + ); + analyze_sandwich_sets( + &mut analysis, + "vap_output", + "PEER_MISSING_STABLE_OUTPUT", + Side::Left, + left, + right, + |sample| &sample.vaps, + args, + ); + analyze_sandwich_sets( + &mut analysis, + "vap_output", + "PEER_MISSING_STABLE_OUTPUT", + Side::Right, + right, + left, + |sample| &sample.vaps, + args, + ); + analysis +} + +fn analyze_sandwich_objects( + analysis: &mut SandwichAnalysis, + source_side: Side, + source: &[SequenceSample], + peer: &[SequenceSample], + args: &Args, +) { + for pair in source.windows(2) { + let source_start = &pair[0]; + let source_end = &pair[1]; + if source_start.validation_time >= source_end.validation_time { + continue; + } + let peers = peer_samples_between(peer, source_start, source_end); + if peers.is_empty() { + continue; + } + for (uri, source_hash) in &source_start.objects { + if source_end.objects.get(uri) != Some(source_hash) { + continue; + } + for peer_sample in &peers { + match peer_sample.objects.get(uri) { + Some(peer_hash) if peer_hash == source_hash => {} + Some(peer_hash) => analysis.add( + "PEER_HASH_MISMATCH_STABLE_OBJECT", + sandwich_record( + "PEER_HASH_MISMATCH_STABLE_OBJECT", + "object", + uri.clone(), + source_side, + source_start, + source_end, + peer_sample, + Some(source_hash.clone()), + Some(peer_hash.clone()), + "source interval has stable object hash; peer sample has same URI with another hash", + ), + args.sample_limit, + ), + None => analysis.add( + "PEER_MISSING_STABLE_OBJECT", + sandwich_record( + "PEER_MISSING_STABLE_OBJECT", + "object", + uri.clone(), + source_side, + source_start, + source_end, + peer_sample, + Some(source_hash.clone()), + None, + "source interval has stable object hash; peer sample misses the URI", + ), + args.sample_limit, + ), + } + } + } + } +} + +fn analyze_sandwich_sets( + analysis: &mut SandwichAnalysis, + set_type: &'static str, + classification: &'static str, + source_side: Side, + source: &[SequenceSample], + peer: &[SequenceSample], + extract: F, + args: &Args, +) where + F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, +{ + for pair in source.windows(2) { + let source_start = &pair[0]; + let source_end = &pair[1]; + if source_start.validation_time >= source_end.validation_time { + continue; + } + let peers = peer_samples_between(peer, source_start, source_end); + if peers.is_empty() { + continue; + } + let start_set = extract(source_start); + let end_set = extract(source_end); + for key in start_set { + if !end_set.contains(key) { + continue; + } + for peer_sample in &peers { + if extract(peer_sample).contains(key) { + continue; + } + analysis.add( + classification, + sandwich_record( + classification, + set_type, + key.clone(), + source_side, + source_start, + source_end, + peer_sample, + Some(key.clone()), + None, + "source interval has a stable key; peer sample misses the key", + ), + args.sample_limit, + ); + } + } + } +} + +fn peer_samples_between<'a>( + peer: &'a [SequenceSample], + source_start: &SequenceSample, + source_end: &SequenceSample, +) -> Vec<&'a SequenceSample> { + peer.iter() + .filter(|sample| { + source_start.validation_time < sample.validation_time + && sample.validation_time < source_end.validation_time + }) + .collect() +} + +#[allow(clippy::too_many_arguments)] +fn sandwich_record( + classification: &'static str, + set_type: &'static str, + key: String, + source_side: Side, + source_start: &SequenceSample, + source_end: &SequenceSample, + peer_sample: &SequenceSample, + source_value: Option, + peer_value: Option, + note: &str, +) -> SandwichRecord { + SandwichRecord { + classification, + set_type, + key, + source_side, + source_start_seq: source_start.raw.seq, + source_start_run_id: source_start.raw.run_id.clone(), + source_end_seq: source_end.raw.seq, + source_end_run_id: source_end.raw.run_id.clone(), + peer_seq: peer_sample.raw.seq, + peer_run_id: peer_sample.raw.run_id.clone(), + source_value, + peer_value, + source_start_time: format_time(source_start.validation_time), + peer_time: format_time(peer_sample.validation_time), + source_end_time: format_time(source_end.validation_time), + note: note.to_string(), + } +} + +impl SandwichAnalysis { + fn add(&mut self, class: &'static str, record: SandwichRecord, sample_limit: usize) { + self.total_occurrences += 1; + self.unique_keys.insert(sandwich_unique_key(&record)); + *self.by_set_type.entry(record.set_type).or_default() += 1; + let stats = self.stats.entry(class).or_default(); + stats.total += 1; + stats.unique_keys.insert(sandwich_unique_key(&record)); + if stats.samples.len() < sample_limit { + stats.samples.push(record); + } + } +} + +fn sandwich_unique_key(record: &SandwichRecord) -> String { + format!( + "{}|{}|{}|{}", + record.classification, + record.set_type, + record.source_side.as_str(), + record.key + ) +}