diff --git a/src/tools/sequence_triage_ccr_cir.rs b/src/tools/sequence_triage_ccr_cir.rs index ea5ce11..8d1dc95 100644 --- a/src/tools/sequence_triage_ccr_cir.rs +++ b/src/tools/sequence_triage_ccr_cir.rs @@ -1,18 +1,16 @@ -mod adjusted; -mod analysis; mod args; +mod churn; mod io; mod loader; mod model; mod output; mod sandwich; -use adjusted::build_adjusted_analysis; -use analysis::{analyze_hash_rollover, analyze_set, collect_persistent_events}; use args::{Args, parse_args}; +use churn::build_intra_rp_churn; use loader::load_sequence; -use model::{AnalysisResult, Side}; -use output::{build_output, write_json, write_markdown, write_samples_jsonl}; +use model::Side; +use output::{build_output, write_json, write_markdown}; use sandwich::build_sandwich_analysis; pub fn main_entry() -> Result<(), String> { @@ -33,76 +31,11 @@ fn run(args: Args) -> Result<(), String> { return Err("left and right sequences must both contain at least one sample".into()); } - let mut result = AnalysisResult::default(); - analyze_set( - &mut result, - "object_uri", - &left, - &right, - |sample| &sample.object_uris, - "TEMPORAL_LAG_RESOLVED", - "PERSISTENT_OBJECT_SET_DIVERGENCE", - &args, - ); - analyze_set( - &mut result, - "object_hash", - &left, - &right, - |sample| &sample.object_hashes, - "TEMPORAL_LAG_RESOLVED", - "PERSISTENT_CONTENT_DIVERGENCE", - &args, - ); - analyze_hash_rollover(&mut result, &left, &right, &args); - analyze_set( - &mut result, - "reject_uri", - &left, - &right, - |sample| &sample.rejects, - "TEMPORAL_LAG_RESOLVED", - "PERSISTENT_REJECT_DIVERGENCE", - &args, - ); - analyze_set( - &mut result, - "trust_anchor", - &left, - &right, - |sample| &sample.trust_anchors, - "TEMPORAL_LAG_RESOLVED", - "PERSISTENT_TA_DIFFERENCE", - &args, - ); - analyze_set( - &mut result, - "vrp_output", - &left, - &right, - |sample| &sample.vrps, - "TEMPORAL_LAG_RESOLVED", - "PERSISTENT_OUTPUT_DIVERGENCE", - &args, - ); - analyze_set( - &mut result, - "vap_output", - &left, - &right, - |sample| &sample.vaps, - "TEMPORAL_LAG_RESOLVED", - "PERSISTENT_OUTPUT_DIVERGENCE", - &args, - ); - - let persistent_events = collect_persistent_events(&left, &right, &args); - let adjusted = build_adjusted_analysis(&args, &left, &right, &persistent_events); let sandwich = build_sandwich_analysis(&args, &left, &right); - let output = build_output(&args, &left, &right, &result, &adjusted, &sandwich); + let churn = build_intra_rp_churn(&left, &right); + let output = build_output(&args, &left, &right, &sandwich, &churn); write_json(&args.out_dir.join("sequence-triage.json"), &output)?; write_markdown(&args.out_dir.join("sequence-triage.md"), &output)?; - write_samples_jsonl(&args.out_dir.join("sequence-diff-samples.jsonl"), &result)?; println!("{}", args.out_dir.display()); Ok(()) } @@ -147,7 +80,7 @@ mod tests { } #[test] - fn run_classifies_temporal_and_persistent_differences() { + fn run_outputs_sandwich_only_schema_and_churn() { let temp = tempfile::tempdir().expect("tempdir"); let root = temp.path(); write_sample( @@ -156,7 +89,7 @@ mod tests { "left", 1, &[object("rsync://example.net/a.roa", 0x11)], - &[], + &["rsync://example.net/reject-old.roa"], 64496, ); write_sample( @@ -213,12 +146,29 @@ mod tests { &std::fs::read_to_string(root.join("out/sequence-triage.json")).unwrap(), ) .unwrap(); - assert!(class_count(&output, "TEMPORAL_LAG_RESOLVED") > 0); - assert!(class_count(&output, "PERSISTENT_OBJECT_SET_DIVERGENCE") > 0); + assert_eq!(output["schemaVersion"].as_u64(), Some(3)); + assert!(output.get("classificationCounts").is_none()); + assert!(output.get("adjusted").is_none()); + assert_eq!( + churn_record(&output, "left", "object", "left-1", "left-2")["changedAbs"].as_u64(), + Some(1) + ); + assert_eq!( + churn_record(&output, "left", "output", "left-1", "left-2")["changedAbs"].as_u64(), + Some(2) + ); + assert_eq!( + churn_record(&output, "right", "object", "right-1", "right-2")["changedAbs"].as_u64(), + Some(1) + ); + assert_eq!( + churn_record(&output, "left", "reject", "left-1", "left-2")["changedAbs"].as_u64(), + Some(1) + ); } #[test] - fn run_classifies_reject_and_output_divergence() { + fn run_single_sample_keeps_legacy_sections_absent() { let temp = tempfile::tempdir().expect("tempdir"); let root = temp.path(); let objects = [object("rsync://example.net/a.roa", 0x11)]; @@ -258,12 +208,19 @@ mod tests { &std::fs::read_to_string(root.join("out/sequence-triage.json")).unwrap(), ) .unwrap(); - assert!(class_count(&output, "PERSISTENT_REJECT_DIVERGENCE") > 0); - assert!(class_count(&output, "PERSISTENT_OUTPUT_DIVERGENCE") > 0); + assert_eq!(output["schemaVersion"].as_u64(), Some(3)); + assert!(output.get("classificationCounts").is_none()); + assert!(output.get("adjusted").is_none()); + assert_eq!( + output["sandwich"]["totals"]["occurrences"].as_u64(), + Some(0) + ); + assert_eq!(output["intraRpChurn"]["left"].as_array().unwrap().len(), 0); + assert_eq!(output["intraRpChurn"]["right"].as_array().unwrap().len(), 0); } #[test] - fn run_adjusted_classifies_leading_content_rollover() { + fn run_content_rollover_fixture_keeps_legacy_sections_absent() { let temp = tempfile::tempdir().expect("tempdir"); let root = temp.path(); let uri = "rsync://example.net/a.roa"; @@ -303,21 +260,13 @@ mod tests { &std::fs::read_to_string(root.join("out/sequence-triage.json")).unwrap(), ) .unwrap(); - assert!(adjusted_class_occurrences(&output, "EDGE_LEADING_CONTENT_ROLLOVER") > 0); - assert_eq!( - adjusted_class_occurrences(&output, "STABLE_CONTENT_DIVERGENCE"), - 0 - ); - assert_eq!( - output["adjusted"]["adjustedStablePersistent"]["occurrences"] - .as_u64() - .unwrap(), - 0 - ); + assert_eq!(output["schemaVersion"].as_u64(), Some(3)); + assert!(output.get("classificationCounts").is_none()); + assert!(output.get("adjusted").is_none()); } #[test] - fn run_adjusted_classifies_stable_middle_content_divergence() { + fn run_middle_content_fixture_keeps_legacy_sections_absent() { let temp = tempfile::tempdir().expect("tempdir"); let root = temp.path(); let uri = "rsync://example.net/a.roa"; @@ -375,17 +324,13 @@ mod tests { &std::fs::read_to_string(root.join("out/sequence-triage.json")).unwrap(), ) .unwrap(); - assert!(adjusted_class_occurrences(&output, "STABLE_CONTENT_DIVERGENCE") > 0); - assert_eq!( - output["adjusted"]["adjustedStablePersistent"]["occurrences"] - .as_u64() - .unwrap(), - 2 - ); + assert_eq!(output["schemaVersion"].as_u64(), Some(3)); + assert!(output.get("classificationCounts").is_none()); + assert!(output.get("adjusted").is_none()); } #[test] - fn run_adjusted_filters_trailing_output() { + fn run_trailing_output_fixture_keeps_legacy_sections_absent() { let temp = tempfile::tempdir().expect("tempdir"); let root = temp.path(); let objects = [object("rsync://example.net/a.roa", 0x11)]; @@ -425,15 +370,13 @@ mod tests { &std::fs::read_to_string(root.join("out/sequence-triage.json")).unwrap(), ) .unwrap(); - assert!(adjusted_class_occurrences(&output, "EDGE_TRAILING_UNRESOLVED") > 0); - assert_eq!( - adjusted_class_occurrences(&output, "STABLE_OUTPUT_DIVERGENCE"), - 0 - ); + assert_eq!(output["schemaVersion"].as_u64(), Some(3)); + assert!(output.get("classificationCounts").is_none()); + assert!(output.get("adjusted").is_none()); } #[test] - fn run_groups_stable_object_events_by_physical_object() { + fn run_object_group_fixture_keeps_legacy_sections_absent() { let temp = tempfile::tempdir().expect("tempdir"); let root = temp.path(); let missing = "rsync://example.net/repo/pp/a.roa"; @@ -512,21 +455,9 @@ mod tests { &std::fs::read_to_string(root.join("out/sequence-triage.json")).unwrap(), ) .unwrap(); - let groups = output["adjusted"]["stableObjectGroups"].as_array().unwrap(); - assert_eq!(groups.len(), 1); - assert_eq!(groups[0]["eventCount"].as_u64(), Some(2)); - assert_eq!(groups[0]["physicalObjectCount"].as_u64(), Some(1)); - assert_eq!( - groups[0]["publicationPoint"].as_str(), - Some("rsync://example.net/repo/pp/") - ); - assert_eq!( - groups[0]["physicalObjects"][0]["eventTypes"] - .as_array() - .unwrap() - .len(), - 2 - ); + assert_eq!(output["schemaVersion"].as_u64(), Some(3)); + assert!(output.get("classificationCounts").is_none()); + assert!(output.get("adjusted").is_none()); } #[test] @@ -600,16 +531,10 @@ mod tests { sandwich_class_occurrences(&output, "PEER_MISSING_STABLE_OUTPUT"), 2 ); - } - - fn class_count(output: &Value, class: &str) -> u64 { - output["classificationCounts"] - .as_array() - .unwrap() - .iter() - .find(|item| item["classification"].as_str() == Some(class)) - .and_then(|item| item["count"].as_u64()) - .unwrap_or(0) + assert_eq!(output["sandwich"]["heatmap"].as_array().unwrap().len(), 1); + assert_eq!(output["sandwich"]["heatmap"][0]["total"].as_u64(), Some(5)); + assert!(output.get("classificationCounts").is_none()); + assert!(output.get("adjusted").is_none()); } fn sandwich_class_occurrences(output: &Value, class: &str) -> u64 { @@ -622,14 +547,23 @@ mod tests { .unwrap_or(0) } - fn adjusted_class_occurrences(output: &Value, class: &str) -> u64 { - output["adjusted"]["classificationCounts"] + fn churn_record<'a>( + output: &'a Value, + side: &str, + set_type: &str, + from_run_id: &str, + to_run_id: &str, + ) -> &'a Value { + output["intraRpChurn"][side] .as_array() .unwrap() .iter() - .find(|item| item["classification"].as_str() == Some(class)) - .and_then(|item| item["occurrences"].as_u64()) - .unwrap_or(0) + .find(|item| { + item["setType"].as_str() == Some(set_type) + && item["fromRunId"].as_str() == Some(from_run_id) + && item["toRunId"].as_str() == Some(to_run_id) + }) + .unwrap() } fn object(uri: &str, byte: u8) -> CirObject { diff --git a/src/tools/sequence_triage_ccr_cir/adjusted.rs b/src/tools/sequence_triage_ccr_cir/adjusted.rs deleted file mode 100644 index 33e8eae..0000000 --- a/src/tools/sequence_triage_ccr_cir/adjusted.rs +++ /dev/null @@ -1,503 +0,0 @@ -use std::collections::{BTreeMap, BTreeSet}; - -use serde_json::{Value, json}; - -use super::analysis::peer_sample_at_seq; -use super::args::Args; -use super::io::{format_time, path_string}; -use super::model::{ - AdjustedAnalysis, AdjustedRecord, DiffEvent, EdgePosition, SequenceSample, Side, -}; - -pub(super) fn build_adjusted_analysis( - args: &Args, - left: &[SequenceSample], - right: &[SequenceSample], - events: &[DiffEvent], -) -> AdjustedAnalysis { - let mut analysis = AdjustedAnalysis { - raw_persistent_occurrences: events.len(), - raw_persistent_unique_keys: unique_event_count(events), - ..Default::default() - }; - let mut edge_filtered_unique = BTreeSet::new(); - - for event in events { - let source_samples = samples_for_side(event.source_side, left, right); - let peer_samples = samples_for_side(opposite_side(event.source_side), left, right); - let edge = edge_position(source_samples, event.source_seq, args); - if edge == EdgePosition::Stable { - analysis.edge_filtered_occurrences += 1; - edge_filtered_unique.insert(event_identity(event)); - } - let (classification, note) = - adjusted_classification(event, edge, source_samples, peer_samples); - analysis.add( - classification, - AdjustedRecord { - classification, - event_type: event.event_type, - key: event.key.clone(), - source_side: event.source_side, - source_seq: event.source_seq, - source_run_id: event.source_run_id.clone(), - note, - }, - args.sample_limit, - ); - } - - analysis.edge_filtered_unique_keys = edge_filtered_unique.len(); - let mut stable_unique = BTreeSet::new(); - for (class, stats) in &analysis.stats { - if class.starts_with("STABLE_") { - analysis.adjusted_stable_occurrences += stats.total; - stable_unique.extend(stats.unique_keys.iter().cloned()); - } - } - analysis.adjusted_stable_unique_keys = stable_unique.len(); - let timeline_limit = if args.timeline_sample_limit == 0 { - args.sample_limit - } else { - args.timeline_sample_limit - }; - analysis.uri_timeline_samples = - build_uri_timeline_samples(left, right, &analysis, timeline_limit); - analysis.stable_object_groups = - build_stable_object_groups(&analysis, left, right, timeline_limit); - analysis -} - -fn adjusted_classification( - event: &DiffEvent, - edge: EdgePosition, - source: &[SequenceSample], - peer: &[SequenceSample], -) -> (&'static str, String) { - if event.event_type == "trust_anchor" { - if peer_has_same_ta_identity(peer, &event.key) { - return ( - "TA_PROJECTION_FORMAT_DIFFERENCE", - "same TAL hash and TA certificate hash exist on peer side; full TA projection string differs".to_string(), - ); - } - if edge == EdgePosition::Stable { - return ( - "STABLE_TA_DIVERGENCE", - "trust-anchor identity is not aligned in a non-boundary sample".to_string(), - ); - } - return edge_unresolved_class(edge); - } - - if event.event_type == "object_hash" { - if let Some((uri, hash)) = split_object_hash_key(&event.key) { - let peer_has_uri = peer_has_uri_any(peer, uri); - let peer_has_different_hash = peer_has_uri_different_hash_any(peer, uri, hash); - let peer_hash_at_source_seq = peer_sample_at_seq(peer, event.source_seq) - .and_then(|peer_sample| peer_sample.objects.get(uri)); - let source_later_matches_peer_hash = peer_hash_at_source_seq.is_some_and(|peer_hash| { - source_future_has_hash(source, event.source_seq, uri, peer_hash) - }); - if edge == EdgePosition::Leading && peer_has_different_hash { - return ( - "EDGE_LEADING_CONTENT_ROLLOVER", - "source leading-edge hash is absent, while peer already has the same URI with another hash".to_string(), - ); - } - if edge == EdgePosition::Trailing && peer_has_different_hash { - return ( - "EDGE_TRAILING_CONTENT_ROLLOVER", - "source trailing-edge hash is absent, while peer has the same URI with another hash and no later source observation exists".to_string(), - ); - } - if edge == EdgePosition::Stable && source_later_matches_peer_hash { - return ( - "MID_SEQUENCE_CONTENT_ROLLOVER_RESOLVED", - "peer already has a newer same-URI hash at this seq and source catches up later in the observed sequence".to_string(), - ); - } - if edge == EdgePosition::Stable && peer_has_different_hash { - return ( - "STABLE_CONTENT_DIVERGENCE", - "same URI exists on peer side with a different hash in a non-boundary sample" - .to_string(), - ); - } - if edge == EdgePosition::Stable && !peer_has_uri { - return ( - "STABLE_OBJECT_SET_DIVERGENCE", - "content key is derived from a non-boundary URI that is absent on peer side" - .to_string(), - ); - } - } - if edge == EdgePosition::Stable { - return ( - "STABLE_CONTENT_DIVERGENCE", - "object hash key remains unaligned in a non-boundary sample".to_string(), - ); - } - return edge_unresolved_class(edge); - } - - if edge != EdgePosition::Stable { - return edge_unresolved_class(edge); - } - - let stable_class = match event.raw_class { - "PERSISTENT_OBJECT_SET_DIVERGENCE" => "STABLE_OBJECT_SET_DIVERGENCE", - "PERSISTENT_REJECT_DIVERGENCE" => "STABLE_REJECT_DIVERGENCE", - "PERSISTENT_OUTPUT_DIVERGENCE" => "STABLE_OUTPUT_DIVERGENCE", - "PERSISTENT_CONTENT_DIVERGENCE" => "STABLE_CONTENT_DIVERGENCE", - "PERSISTENT_TA_DIFFERENCE" => "STABLE_TA_DIVERGENCE", - _ => "STABLE_UNCLASSIFIED_DIVERGENCE", - }; - ( - stable_class, - "persistent event appears in a non-boundary sample after sequence edge filtering" - .to_string(), - ) -} - -fn edge_unresolved_class(edge: EdgePosition) -> (&'static str, String) { - match edge { - EdgePosition::Leading => ( - "EDGE_LEADING_UNRESOLVED", - "event appears only from the warmup edge and may predate the observed sequence" - .to_string(), - ), - EdgePosition::Trailing => ( - "EDGE_TRAILING_UNRESOLVED", - "event appears at the cooldown edge and lacks later peer observations".to_string(), - ), - EdgePosition::Stable => ( - "STABLE_UNCLASSIFIED_DIVERGENCE", - "event is not on an edge but no specific stable category matched".to_string(), - ), - } -} - -impl AdjustedAnalysis { - fn add(&mut self, class: &'static str, record: AdjustedRecord, sample_limit: usize) { - let stats = self.stats.entry(class).or_default(); - stats.total += 1; - stats.unique_keys.insert(format!( - "{}|{}|{}", - record.event_type, - record.source_side.as_str(), - record.key - )); - if stats.samples.len() < sample_limit { - stats.samples.push(record); - } - } -} - -fn unique_event_count(events: &[DiffEvent]) -> usize { - events - .iter() - .map(event_identity) - .collect::>() - .len() -} - -fn event_identity(event: &DiffEvent) -> String { - format!( - "{}|{}|{}", - event.event_type, - event.source_side.as_str(), - event.key - ) -} - -fn samples_for_side<'a>( - side: Side, - left: &'a [SequenceSample], - right: &'a [SequenceSample], -) -> &'a [SequenceSample] { - match side { - Side::Left => left, - Side::Right => right, - } -} - -fn opposite_side(side: Side) -> Side { - match side { - Side::Left => Side::Right, - Side::Right => Side::Left, - } -} - -fn edge_position(samples: &[SequenceSample], seq: u32, args: &Args) -> EdgePosition { - let Some(index) = samples.iter().position(|sample| sample.raw.seq == seq) else { - return EdgePosition::Stable; - }; - if index < args.warmup_samples { - return EdgePosition::Leading; - } - if samples.len().saturating_sub(index) <= args.cooldown_samples { - return EdgePosition::Trailing; - } - EdgePosition::Stable -} - -fn peer_has_uri_any(peer: &[SequenceSample], uri: &str) -> bool { - peer.iter().any(|sample| sample.objects.contains_key(uri)) -} - -fn peer_has_uri_different_hash_any(peer: &[SequenceSample], uri: &str, hash: &str) -> bool { - peer.iter() - .filter_map(|sample| sample.objects.get(uri)) - .any(|peer_hash| peer_hash != hash) -} - -fn source_future_has_hash( - source: &[SequenceSample], - seq: u32, - uri: &str, - expected_hash: &str, -) -> bool { - source - .iter() - .filter(|sample| sample.raw.seq > seq) - .any(|sample| { - sample - .objects - .get(uri) - .is_some_and(|hash| hash == expected_hash) - }) -} - -fn peer_has_same_ta_identity(peer: &[SequenceSample], key: &str) -> bool { - let Some(identity) = trust_anchor_identity(key) else { - return false; - }; - peer.iter().any(|sample| { - sample - .trust_anchors - .iter() - .filter_map(|peer_key| trust_anchor_identity(peer_key)) - .any(|peer_identity| peer_identity == identity) - }) -} - -fn trust_anchor_identity(key: &str) -> Option { - let parts = key.split('|').collect::>(); - if parts.len() != 4 { - return None; - } - Some(format!("{}|{}", parts[2], parts[3])) -} - -fn split_object_hash_key(key: &str) -> Option<(&str, &str)> { - key.rsplit_once('|') -} - -fn build_uri_timeline_samples( - left: &[SequenceSample], - right: &[SequenceSample], - adjusted: &AdjustedAnalysis, - limit: usize, -) -> Vec { - let mut uris = BTreeSet::new(); - for stats in adjusted.stats.values() { - for sample in &stats.samples { - if sample.event_type == "object_hash" - && let Some((uri, _)) = split_object_hash_key(&sample.key) - { - uris.insert(uri.to_string()); - } - if sample.event_type == "object_uri" { - uris.insert(sample.key.clone()); - } - } - } - uris.into_iter() - .take(limit) - .map(|uri| { - json!({ - "uri": uri, - "left": timeline_for_uri(left, &uri), - "right": timeline_for_uri(right, &uri), - }) - }) - .collect() -} - -fn timeline_for_uri(samples: &[SequenceSample], uri: &str) -> Vec { - samples - .iter() - .filter_map(|sample| { - sample.objects.get(uri).map(|hash| { - json!({ - "seq": sample.raw.seq, - "runId": sample.raw.run_id, - "validationTime": format_time(sample.validation_time), - "hash": hash, - }) - }) - }) - .collect() -} - -fn build_stable_object_groups( - adjusted: &AdjustedAnalysis, - left: &[SequenceSample], - right: &[SequenceSample], - limit: usize, -) -> Vec { - let mut groups: BTreeMap = BTreeMap::new(); - let Some(stats) = adjusted.stats.get("STABLE_OBJECT_SET_DIVERGENCE") else { - return Vec::new(); - }; - for sample in &stats.samples { - let physical_uri = physical_object_uri(sample); - let group_key = format!( - "{}|{}|{}|{}", - sample.source_side.as_str(), - sample.source_seq, - sample.source_run_id, - publication_point_prefix(&physical_uri) - ); - let group = groups - .entry(group_key) - .or_insert_with(|| StableObjectGroup::new(sample, &physical_uri)); - if group.source_cir_path.is_none() - && let Some(source_sample) = sample_by_side_seq_run( - left, - right, - sample.source_side, - sample.source_seq, - &sample.source_run_id, - ) - { - group.source_cir_path = Some(path_string(&source_sample.cir_path)); - } - group.add(sample, physical_uri); - } - groups - .into_values() - .take(limit) - .map(StableObjectGroup::to_json) - .collect() -} - -#[derive(Clone, Debug)] -struct StableObjectGroup { - source_side: Side, - source_seq: u32, - source_run_id: String, - source_cir_path: Option, - publication_point: String, - event_count: usize, - event_types: BTreeMap<&'static str, usize>, - physical_objects: BTreeMap, -} - -impl StableObjectGroup { - fn new(sample: &AdjustedRecord, physical_uri: &str) -> Self { - Self { - source_side: sample.source_side, - source_seq: sample.source_seq, - source_run_id: sample.source_run_id.clone(), - source_cir_path: None, - publication_point: publication_point_prefix(physical_uri), - event_count: 0, - event_types: BTreeMap::new(), - physical_objects: BTreeMap::new(), - } - } - - fn add(&mut self, sample: &AdjustedRecord, physical_uri: String) { - self.event_count += 1; - *self.event_types.entry(sample.event_type).or_default() += 1; - let object = self - .physical_objects - .entry(physical_uri.clone()) - .or_insert_with(|| StablePhysicalObject { - extension: object_extension(&physical_uri).to_string(), - uri: physical_uri, - event_types: BTreeSet::new(), - hashes: BTreeSet::new(), - }); - object.event_types.insert(sample.event_type); - if let Some(hash) = event_hash(sample) { - object.hashes.insert(hash.to_string()); - } - } - - fn to_json(self) -> Value { - json!({ - "sourceSide": self.source_side.as_str(), - "sourceSeq": self.source_seq, - "sourceRunId": self.source_run_id, - "sourceCirPath": self.source_cir_path, - "publicationPoint": self.publication_point, - "eventCount": self.event_count, - "eventTypes": self.event_types, - "physicalObjectCount": self.physical_objects.len(), - "physicalObjects": self.physical_objects.into_values().map(StablePhysicalObject::to_json).collect::>(), - }) - } -} - -#[derive(Clone, Debug)] -struct StablePhysicalObject { - uri: String, - extension: String, - event_types: BTreeSet<&'static str>, - hashes: BTreeSet, -} - -impl StablePhysicalObject { - fn to_json(self) -> Value { - json!({ - "uri": self.uri, - "extension": self.extension, - "eventTypes": self.event_types, - "hashes": self.hashes, - }) - } -} - -fn physical_object_uri(sample: &AdjustedRecord) -> String { - if sample.event_type == "object_hash" - && let Some((uri, _)) = split_object_hash_key(&sample.key) - { - return uri.to_string(); - } - sample.key.clone() -} - -fn event_hash(sample: &AdjustedRecord) -> Option<&str> { - if sample.event_type == "object_hash" { - split_object_hash_key(&sample.key).map(|(_, hash)| hash) - } else { - None - } -} - -fn publication_point_prefix(uri: &str) -> String { - uri.rsplit_once('/') - .map(|(prefix, _)| format!("{prefix}/")) - .unwrap_or_else(|| uri.to_string()) -} - -fn object_extension(uri: &str) -> &str { - uri.rsplit_once('.') - .map(|(_, extension)| extension) - .unwrap_or("") -} - -fn sample_by_side_seq_run<'a>( - left: &'a [SequenceSample], - right: &'a [SequenceSample], - side: Side, - seq: u32, - run_id: &str, -) -> Option<&'a SequenceSample> { - samples_for_side(side, left, right) - .iter() - .find(|sample| sample.raw.seq == seq && sample.raw.run_id == run_id) -} diff --git a/src/tools/sequence_triage_ccr_cir/analysis.rs b/src/tools/sequence_triage_ccr_cir/analysis.rs deleted file mode 100644 index fa02859..0000000 --- a/src/tools/sequence_triage_ccr_cir/analysis.rs +++ /dev/null @@ -1,364 +0,0 @@ -use std::collections::BTreeSet; - -use super::args::Args; -use super::io::object_hash_key; -use super::model::{ - AnalysisResult, DiffEvent, EventOccurrence, SampleRecord, SequenceSample, Side, -}; - -pub(super) fn analyze_set( - result: &mut AnalysisResult, - event_type: &'static str, - left: &[SequenceSample], - right: &[SequenceSample], - extract: F, - resolved_class: &'static str, - persistent_class: &'static str, - args: &Args, -) where - F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, -{ - analyze_direction( - result, - event_type, - Side::Left, - left, - right, - &extract, - resolved_class, - persistent_class, - args, - ); - analyze_direction( - result, - event_type, - Side::Right, - right, - left, - &extract, - resolved_class, - persistent_class, - args, - ); -} - -fn analyze_direction( - result: &mut AnalysisResult, - event_type: &'static str, - source_side: Side, - source: &[SequenceSample], - peer: &[SequenceSample], - extract: &F, - resolved_class: &'static str, - persistent_class: &'static str, - args: &Args, -) where - F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, -{ - for sample in source { - let source_set = extract(sample); - for key in source_set { - if peer_sample_at_seq(peer, sample.raw.seq) - .is_some_and(|peer_sample| extract(peer_sample).contains(key)) - { - continue; - } - if let Some(matched) = find_future_match(peer, sample, key, extract, args) { - result.add( - resolved_class, - SampleRecord { - classification: resolved_class, - event_type, - key: key.clone(), - source_side, - source_seq: sample.raw.seq, - source_run_id: sample.raw.run_id.clone(), - matched_seq: Some(matched.seq), - matched_run_id: Some(matched.run_id), - note: format!( - "matched in {} sequence within alignment window", - matched.side.as_str() - ), - }, - args.sample_limit, - ); - } else { - result.add( - persistent_class, - SampleRecord { - classification: persistent_class, - event_type, - key: key.clone(), - source_side, - source_seq: sample.raw.seq, - source_run_id: sample.raw.run_id.clone(), - matched_seq: None, - matched_run_id: None, - note: "no matching event in peer sequence alignment window".to_string(), - }, - args.sample_limit, - ); - } - } - } -} - -pub(super) fn analyze_hash_rollover( - result: &mut AnalysisResult, - left: &[SequenceSample], - right: &[SequenceSample], - args: &Args, -) { - for (source_side, source, peer) in [(Side::Left, left, right), (Side::Right, right, left)] { - for sample in source { - for (uri, hash) in &sample.objects { - if peer_sample_at_seq(peer, sample.raw.seq) - .and_then(|peer_sample| peer_sample.objects.get(uri)) - .is_some_and(|peer_hash| peer_hash == hash) - { - continue; - } - if let Some(peer_sample) = peer_sample_at_seq(peer, sample.raw.seq) - && peer_sample.objects.contains_key(uri) - && find_future_hash_match(peer, sample, uri, hash, args).is_some() - { - let matched = - find_future_hash_match(peer, sample, uri, hash, args).expect("match"); - result.add( - "CONTENT_ROLLOVER_RESOLVED", - SampleRecord { - classification: "CONTENT_ROLLOVER_RESOLVED", - event_type: "object_content_rollover", - key: object_hash_key(uri, hash), - source_side, - source_seq: sample.raw.seq, - source_run_id: sample.raw.run_id.clone(), - matched_seq: Some(matched.seq), - matched_run_id: Some(matched.run_id), - note: "same URI hash appeared in peer sequence later".to_string(), - }, - args.sample_limit, - ); - } - } - } - } -} - -pub(super) fn collect_persistent_events( - left: &[SequenceSample], - right: &[SequenceSample], - args: &Args, -) -> Vec { - let mut events = Vec::new(); - collect_persistent_set( - &mut events, - "object_uri", - "PERSISTENT_OBJECT_SET_DIVERGENCE", - left, - right, - |sample| &sample.object_uris, - args, - ); - collect_persistent_set( - &mut events, - "object_hash", - "PERSISTENT_CONTENT_DIVERGENCE", - left, - right, - |sample| &sample.object_hashes, - args, - ); - collect_persistent_set( - &mut events, - "reject_uri", - "PERSISTENT_REJECT_DIVERGENCE", - left, - right, - |sample| &sample.rejects, - args, - ); - collect_persistent_set( - &mut events, - "trust_anchor", - "PERSISTENT_TA_DIFFERENCE", - left, - right, - |sample| &sample.trust_anchors, - args, - ); - collect_persistent_set( - &mut events, - "vrp_output", - "PERSISTENT_OUTPUT_DIVERGENCE", - left, - right, - |sample| &sample.vrps, - args, - ); - collect_persistent_set( - &mut events, - "vap_output", - "PERSISTENT_OUTPUT_DIVERGENCE", - left, - right, - |sample| &sample.vaps, - args, - ); - events -} - -fn collect_persistent_set( - events: &mut Vec, - event_type: &'static str, - raw_class: &'static str, - left: &[SequenceSample], - right: &[SequenceSample], - extract: F, - args: &Args, -) where - F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, -{ - collect_persistent_direction( - events, - event_type, - raw_class, - Side::Left, - left, - right, - &extract, - args, - ); - collect_persistent_direction( - events, - event_type, - raw_class, - Side::Right, - right, - left, - &extract, - args, - ); -} - -fn collect_persistent_direction( - events: &mut Vec, - event_type: &'static str, - raw_class: &'static str, - source_side: Side, - source: &[SequenceSample], - peer: &[SequenceSample], - extract: &F, - args: &Args, -) where - F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, -{ - for sample in source { - for key in extract(sample) { - if peer_sample_at_seq(peer, sample.raw.seq) - .is_some_and(|peer_sample| extract(peer_sample).contains(key)) - { - continue; - } - if find_future_match(peer, sample, key, extract, args).is_some() { - continue; - } - events.push(DiffEvent { - event_type, - raw_class, - key: key.clone(), - source_side, - source_seq: sample.raw.seq, - source_run_id: sample.raw.run_id.clone(), - }); - } - } -} - -impl AnalysisResult { - fn add(&mut self, class: &'static str, record: SampleRecord, sample_limit: usize) { - let stats = self.stats.entry(class).or_default(); - stats.total += 1; - if stats.samples.len() < sample_limit { - stats.samples.push(record); - } - } -} - -pub(super) fn peer_sample_at_seq(peer: &[SequenceSample], seq: u32) -> Option<&SequenceSample> { - peer.iter().find(|sample| sample.raw.seq == seq) -} - -fn find_future_match( - peer: &[SequenceSample], - source: &SequenceSample, - key: &str, - extract: &F, - args: &Args, -) -> Option -where - F: for<'a> Fn(&'a SequenceSample) -> &'a BTreeSet, -{ - peer.iter() - .filter(|candidate| is_in_alignment_window(source, candidate, args)) - .find(|candidate| extract(candidate).contains(key)) - .map(|candidate| { - occurrence( - candidate, - if candidate.raw.side.as_deref() == Some("left") { - Side::Left - } else { - Side::Right - }, - ) - }) -} - -fn find_future_hash_match( - peer: &[SequenceSample], - source: &SequenceSample, - uri: &str, - hash: &str, - args: &Args, -) -> Option { - peer.iter() - .filter(|candidate| is_in_alignment_window(source, candidate, args)) - .find(|candidate| { - candidate - .objects - .get(uri) - .is_some_and(|peer_hash| peer_hash == hash) - }) - .map(|candidate| { - occurrence( - candidate, - if candidate.raw.side.as_deref() == Some("left") { - Side::Left - } else { - Side::Right - }, - ) - }) -} - -fn is_in_alignment_window( - source: &SequenceSample, - candidate: &SequenceSample, - args: &Args, -) -> bool { - if candidate.raw.seq < source.raw.seq { - return false; - } - let run_delta = candidate.raw.seq.saturating_sub(source.raw.seq); - let time_delta = candidate.validation_time - source.validation_time; - let secs = time_delta.whole_seconds().abs(); - run_delta <= args.align_window_runs || secs <= args.align_window_secs -} - -fn occurrence(sample: &SequenceSample, side: Side) -> EventOccurrence { - EventOccurrence { - side, - seq: sample.raw.seq, - run_id: sample.raw.run_id.clone(), - } -} diff --git a/src/tools/sequence_triage_ccr_cir/churn.rs b/src/tools/sequence_triage_ccr_cir/churn.rs new file mode 100644 index 0000000..f65cb14 --- /dev/null +++ b/src/tools/sequence_triage_ccr_cir/churn.rs @@ -0,0 +1,185 @@ +use std::collections::{BTreeMap, BTreeSet}; + +use super::model::{ChurnRecord, ChurnSummaryRecord, IntraRpChurn, SequenceSample, Side}; + +pub(super) fn build_intra_rp_churn( + left: &[SequenceSample], + right: &[SequenceSample], +) -> IntraRpChurn { + let left_records = build_side_records(Side::Left, left); + let right_records = build_side_records(Side::Right, right); + let mut summary = summarize_records(&left_records); + summary.extend(summarize_records(&right_records)); + IntraRpChurn { + left: left_records, + right: right_records, + summary, + } +} + +fn build_side_records(side: Side, samples: &[SequenceSample]) -> Vec { + let mut records = Vec::new(); + for pair in samples.windows(2) { + let from = &pair[0]; + let to = &pair[1]; + records.push(record_churn( + side, + from, + to, + "object", + &from.object_hashes, + &to.object_hashes, + )); + let from_output = output_keys(from); + let to_output = output_keys(to); + records.push(record_churn( + side, + from, + to, + "output", + &from_output, + &to_output, + )); + records.push(record_churn( + side, + from, + to, + "vrp_output", + &from.vrps, + &to.vrps, + )); + records.push(record_churn( + side, + from, + to, + "vap_output", + &from.vaps, + &to.vaps, + )); + records.push(record_churn( + side, + from, + to, + "reject", + &from.rejects, + &to.rejects, + )); + } + records +} + +fn output_keys(sample: &SequenceSample) -> BTreeSet { + sample + .vrps + .iter() + .map(|key| format!("vrp|{key}")) + .chain(sample.vaps.iter().map(|key| format!("vap|{key}"))) + .collect() +} + +fn record_churn( + side: Side, + from: &SequenceSample, + to: &SequenceSample, + set_type: &'static str, + from_set: &BTreeSet, + to_set: &BTreeSet, +) -> ChurnRecord { + let added_count = to_set.difference(from_set).count(); + let removed_count = from_set.difference(to_set).count(); + let changed_abs = added_count + removed_count; + let union_count = from_set.union(to_set).count(); + let changed_ratio_from = ratio(changed_abs, from_set.len()); + let changed_ratio_union = ratio(changed_abs, union_count); + ChurnRecord { + side, + rp_id: from.raw.rp_id.clone(), + from_seq: from.raw.seq, + to_seq: to.raw.seq, + from_run_id: from.raw.run_id.clone(), + to_run_id: to.raw.run_id.clone(), + set_type, + from_count: from_set.len(), + to_count: to_set.len(), + added_count, + removed_count, + changed_abs, + union_count, + changed_ratio_from, + changed_ratio_union, + } +} + +fn ratio(numerator: usize, denominator: usize) -> f64 { + if denominator == 0 { + 0.0 + } else { + numerator as f64 / denominator as f64 + } +} + +fn summarize_records(records: &[ChurnRecord]) -> Vec { + let mut grouped: BTreeMap<(Side, &'static str), Vec<&ChurnRecord>> = BTreeMap::new(); + for record in records { + grouped + .entry((record.side, record.set_type)) + .or_default() + .push(record); + } + grouped + .into_iter() + .map(|((side, set_type), group)| { + let count = group.len() as f64; + let max_changed_abs = group + .iter() + .map(|record| record.changed_abs) + .max() + .unwrap_or(0); + let avg_changed_abs = if count == 0.0 { + 0.0 + } else { + group + .iter() + .map(|record| record.changed_abs as f64) + .sum::() + / count + }; + let max_changed_ratio_from = group + .iter() + .map(|record| record.changed_ratio_from) + .fold(0.0, f64::max); + let avg_changed_ratio_from = if count == 0.0 { + 0.0 + } else { + group + .iter() + .map(|record| record.changed_ratio_from) + .sum::() + / count + }; + let max_changed_ratio_union = group + .iter() + .map(|record| record.changed_ratio_union) + .fold(0.0, f64::max); + let avg_changed_ratio_union = if count == 0.0 { + 0.0 + } else { + group + .iter() + .map(|record| record.changed_ratio_union) + .sum::() + / count + }; + ChurnSummaryRecord { + side, + set_type, + max_changed_abs, + avg_changed_abs, + max_changed_ratio_from, + avg_changed_ratio_from, + max_changed_ratio_union, + avg_changed_ratio_union, + } + }) + .collect() +} diff --git a/src/tools/sequence_triage_ccr_cir/model.rs b/src/tools/sequence_triage_ccr_cir/model.rs index b88b6bb..2280e1c 100644 --- a/src/tools/sequence_triage_ccr_cir/model.rs +++ b/src/tools/sequence_triage_ccr_cir/model.rs @@ -2,7 +2,6 @@ use std::collections::{BTreeMap, BTreeSet}; use std::path::PathBuf; use serde::Deserialize; -use serde_json::Value; use time::OffsetDateTime; #[derive(Clone, Debug, Deserialize)] @@ -43,8 +42,9 @@ pub(super) struct SequenceSample { pub(super) vaps: BTreeSet, } -#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] pub(super) enum Side { + #[default] Left, Right, } @@ -58,85 +58,6 @@ impl Side { } } -#[derive(Clone, Debug)] -pub(super) struct EventOccurrence { - pub(super) side: Side, - pub(super) seq: u32, - pub(super) run_id: String, -} - -#[derive(Clone, Debug)] -pub(super) struct SampleRecord { - pub(super) classification: &'static str, - pub(super) event_type: &'static str, - pub(super) key: String, - pub(super) source_side: Side, - pub(super) source_seq: u32, - pub(super) source_run_id: String, - pub(super) matched_seq: Option, - pub(super) matched_run_id: Option, - pub(super) note: String, -} - -#[derive(Clone, Debug, Default)] -pub(super) struct ClassStats { - pub(super) total: usize, - pub(super) samples: Vec, -} - -#[derive(Clone, Debug, Default)] -pub(super) struct AnalysisResult { - pub(super) stats: BTreeMap<&'static str, ClassStats>, -} - -#[derive(Clone, Debug)] -pub(super) struct DiffEvent { - pub(super) event_type: &'static str, - pub(super) raw_class: &'static str, - pub(super) key: String, - pub(super) source_side: Side, - pub(super) source_seq: u32, - pub(super) source_run_id: String, -} - -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub(super) enum EdgePosition { - Leading, - Stable, - Trailing, -} - -#[derive(Clone, Debug)] -pub(super) struct AdjustedRecord { - pub(super) classification: &'static str, - pub(super) event_type: &'static str, - pub(super) key: String, - pub(super) source_side: Side, - pub(super) source_seq: u32, - pub(super) source_run_id: String, - pub(super) note: String, -} - -#[derive(Clone, Debug, Default)] -pub(super) struct AdjustedClassStats { - pub(super) total: usize, - pub(super) unique_keys: BTreeSet, - pub(super) samples: Vec, -} - -#[derive(Clone, Debug, Default)] -pub(super) struct AdjustedAnalysis { - pub(super) raw_persistent_occurrences: usize, - pub(super) raw_persistent_unique_keys: usize, - pub(super) edge_filtered_occurrences: usize, - pub(super) edge_filtered_unique_keys: usize, - pub(super) adjusted_stable_occurrences: usize, - pub(super) adjusted_stable_unique_keys: usize, - pub(super) stats: BTreeMap<&'static str, AdjustedClassStats>, - pub(super) uri_timeline_samples: Vec, - pub(super) stable_object_groups: Vec, -} - #[derive(Clone, Debug)] pub(super) struct SandwichRecord { pub(super) classification: &'static str, @@ -164,10 +85,66 @@ pub(super) struct SandwichClassStats { pub(super) samples: Vec, } +#[derive(Clone, Debug, Default)] +pub(super) struct SandwichHeatmapRow { + pub(super) window: String, + pub(super) source_side: Side, + pub(super) source_start_seq: u32, + pub(super) source_start_run_id: String, + pub(super) source_end_seq: u32, + pub(super) source_end_run_id: String, + pub(super) peer_seq: u32, + pub(super) peer_run_id: String, + pub(super) source_start_time: String, + pub(super) peer_time: String, + pub(super) source_end_time: String, + pub(super) total: usize, + pub(super) class_counts: BTreeMap<&'static str, usize>, +} + #[derive(Clone, Debug, Default)] pub(super) struct SandwichAnalysis { pub(super) total_occurrences: usize, pub(super) unique_keys: BTreeSet, pub(super) by_set_type: BTreeMap<&'static str, usize>, pub(super) stats: BTreeMap<&'static str, SandwichClassStats>, + pub(super) heatmap: BTreeMap, +} + +#[derive(Clone, Debug)] +pub(super) struct ChurnRecord { + pub(super) side: Side, + pub(super) rp_id: String, + pub(super) from_seq: u32, + pub(super) to_seq: u32, + pub(super) from_run_id: String, + pub(super) to_run_id: String, + pub(super) set_type: &'static str, + pub(super) from_count: usize, + pub(super) to_count: usize, + pub(super) added_count: usize, + pub(super) removed_count: usize, + pub(super) changed_abs: usize, + pub(super) union_count: usize, + pub(super) changed_ratio_from: f64, + pub(super) changed_ratio_union: f64, +} + +#[derive(Clone, Debug)] +pub(super) struct ChurnSummaryRecord { + pub(super) side: Side, + pub(super) set_type: &'static str, + pub(super) max_changed_abs: usize, + pub(super) avg_changed_abs: f64, + pub(super) max_changed_ratio_from: f64, + pub(super) avg_changed_ratio_from: f64, + pub(super) max_changed_ratio_union: f64, + pub(super) avg_changed_ratio_union: f64, +} + +#[derive(Clone, Debug, Default)] +pub(super) struct IntraRpChurn { + pub(super) left: Vec, + pub(super) right: Vec, + pub(super) summary: Vec, } diff --git a/src/tools/sequence_triage_ccr_cir/output.rs b/src/tools/sequence_triage_ccr_cir/output.rs index c311d77..25a2721 100644 --- a/src/tools/sequence_triage_ccr_cir/output.rs +++ b/src/tools/sequence_triage_ccr_cir/output.rs @@ -6,7 +6,7 @@ use serde_json::{Value, json}; use super::args::Args; use super::io::{format_time, path_string}; use super::model::{ - AdjustedAnalysis, AdjustedRecord, AnalysisResult, SampleRecord, SandwichAnalysis, + ChurnRecord, ChurnSummaryRecord, IntraRpChurn, SandwichAnalysis, SandwichHeatmapRow, SandwichRecord, SequenceSample, }; @@ -14,49 +14,13 @@ pub(super) fn build_output( args: &Args, left: &[SequenceSample], right: &[SequenceSample], - result: &AnalysisResult, - adjusted: &AdjustedAnalysis, sandwich: &SandwichAnalysis, + churn: &IntraRpChurn, ) -> Value { - let classifications = result - .stats - .iter() - .map(|(class, stats)| { - json!({ - "classification": class, - "count": stats.total, - "samples": stats.samples.iter().map(sample_to_json).collect::>(), - }) - }) - .collect::>(); - let adjusted_classifications = adjusted - .stats - .iter() - .map(|(class, stats)| { - json!({ - "classification": class, - "occurrences": stats.total, - "uniqueKeys": stats.unique_keys.len(), - "samples": stats.samples.iter().map(adjusted_sample_to_json).collect::>(), - }) - }) - .collect::>(); - let sandwich_classifications = sandwich - .stats - .iter() - .map(|(class, stats)| { - json!({ - "classification": class, - "occurrences": stats.total, - "uniqueKeys": stats.unique_keys.len(), - "samples": stats.samples.iter().map(sandwich_sample_to_json).collect::>(), - }) - }) - .collect::>(); json!({ - "schemaVersion": 1, + "schemaVersion": 3, "generatedBy": "sequence_triage_ccr_cir", - "inputContract": "left-right-sequence-jsonl-with-ccr-cir-artifacts", + "inputContract": "sequence-jsonl-cir-ccr-sandwich-only", "parameters": { "leftSequence": path_string(&args.left_sequence), "rightSequence": path_string(&args.right_sequence), @@ -69,37 +33,6 @@ pub(super) fn build_output( }, "left": sequence_summary(left), "right": sequence_summary(right), - "classificationCounts": classifications, - "totals": { - "resolvedTemporalLike": count_class(result, "TEMPORAL_LAG_RESOLVED") + count_class(result, "CONTENT_ROLLOVER_RESOLVED"), - "persistent": count_prefix(result, "PERSISTENT_"), - "unclassified": count_class(result, "UNCLASSIFIED_INSUFFICIENT_WINDOW"), - }, - "adjusted": { - "warmupSamples": args.warmup_samples, - "cooldownSamples": args.cooldown_samples, - "rawPersistent": { - "occurrences": adjusted.raw_persistent_occurrences, - "uniqueKeys": adjusted.raw_persistent_unique_keys, - }, - "edgeFilteredPersistent": { - "occurrences": adjusted.edge_filtered_occurrences, - "uniqueKeys": adjusted.edge_filtered_unique_keys, - }, - "adjustedStablePersistent": { - "occurrences": adjusted.adjusted_stable_occurrences, - "uniqueKeys": adjusted.adjusted_stable_unique_keys, - }, - "classificationCounts": adjusted_classifications, - "uriTimelineSamples": adjusted.uri_timeline_samples, - "stableObjectGroups": adjusted.stable_object_groups, - "interpretation": { - "rawPersistentMeaning": "raw persistent keeps the original #043 single-event matching semantics.", - "edgeFilteredMeaning": "edge-filtered persistent keeps only non-warmup and non-cooldown source occurrences.", - "adjustedStableMeaning": "adjusted stable persistent keeps non-edge findings after URI-level rollover and TA projection filtering.", - "stableObjectGroupsMeaning": "STABLE_OBJECT_SET_DIVERGENCE events are additionally collapsed by source CIR, publication point, and physical object URI so object_uri/object_hash duplicate event views do not inflate physical-object counts.", - } - }, "sandwich": { "strictTimeWindow": true, "method": "For each side, use two adjacent source samples as a stable interval. If source_start.time < peer.time < source_end.time and the source value is identical at both interval endpoints, the peer sample is expected to contain the same value.", @@ -108,7 +41,8 @@ pub(super) fn build_output( "uniqueKeys": sandwich.unique_keys.len(), }, "bySetType": sandwich.by_set_type, - "classificationCounts": sandwich_classifications, + "classificationCounts": sandwich_classifications_to_json(sandwich), + "heatmap": sandwich_heatmap_to_json(sandwich), "interpretation": { "missingStableObject": "The source side proves a URI/hash is stable across an interval that contains the peer sample, but the peer sample has no such URI.", "hashMismatchStableObject": "The source side proves a URI/hash is stable across an interval that contains the peer sample, but the peer sample has the same URI with a different hash.", @@ -116,13 +50,19 @@ pub(super) fn build_output( "missingStableOutput": "The source side consistently outputs a VRP/VAP key across an interval that contains the peer sample, but the peer sample does not output it." } }, + "intraRpChurn": { + "method": "For each side, compare adjacent samples independently. Object keys use object_uri|sha256, output keys use typed VRP/VAP canonical keys, and reject keys use rejected object URI without reason.", + "left": churn_records_to_json(&churn.left), + "right": churn_records_to_json(&churn.right), + "summary": churn_summary_to_json(&churn.summary), + }, "interpretation": { - "temporalResolvedMeaning": "Event appeared only on one side at one sample but aligned in the peer sequence later.", - "persistentMeaning": "Event did not align within the configured run/time window; inspect as a candidate RP behavior difference or persistent sync/input difference.", + "sandwichOnlyMeaning": "This schema intentionally exposes only sandwich anomalies for cross-RP triage. Older raw persistent, adjusted, edge, and rollover classifications are not emitted.", + "intraRpChurnMeaning": "Adjacent-run churn measures how much each RP's own observed object/output/reject sets changed during the sequence. It is a baseline for judging whether cross-RP anomalies exceed natural sequence movement.", "limits": [ "Sequence triage only reads sequence JSONL plus referenced CCR/CIR files.", "It does not read report.json, logs, repo-bytes DB, cache, mirror, or raw objects for root cause proof.", - "Resolved temporal findings reduce false positives but do not prove the exact external repository update time." + "Sandwich anomalies indicate stable-interval contradictions, not final root cause. Manual evidence is still required for network, repository, or implementation attribution." ] } }) @@ -153,35 +93,51 @@ fn sequence_summary(samples: &[SequenceSample]) -> Value { "vrps": sample.raw.vrps.or(Some(sample.vrps.len() as u64)), "vaps": sample.raw.vaps.or(Some(sample.vaps.len() as u64)), "objectCount": sample.object_uris.len(), + "objectHashCount": sample.object_hashes.len(), "rejectCount": sample.rejects.len(), "trustAnchorCount": sample.trust_anchors.len(), })).collect::>(), }) } -fn sample_to_json(sample: &SampleRecord) -> Value { - json!({ - "classification": sample.classification, - "eventType": sample.event_type, - "key": sample.key, - "sourceSide": sample.source_side.as_str(), - "sourceSeq": sample.source_seq, - "sourceRunId": sample.source_run_id, - "matchedSeq": sample.matched_seq, - "matchedRunId": sample.matched_run_id, - "note": sample.note, - }) +fn sandwich_classifications_to_json(sandwich: &SandwichAnalysis) -> Vec { + sandwich + .stats + .iter() + .map(|(class, stats)| { + json!({ + "classification": class, + "occurrences": stats.total, + "uniqueKeys": stats.unique_keys.len(), + "samples": stats.samples.iter().map(sandwich_sample_to_json).collect::>(), + }) + }) + .collect() } -fn adjusted_sample_to_json(sample: &AdjustedRecord) -> Value { +fn sandwich_heatmap_to_json(sandwich: &SandwichAnalysis) -> Vec { + sandwich + .heatmap + .values() + .map(sandwich_heatmap_row_to_json) + .collect() +} + +fn sandwich_heatmap_row_to_json(row: &SandwichHeatmapRow) -> Value { json!({ - "classification": sample.classification, - "eventType": sample.event_type, - "key": sample.key, - "sourceSide": sample.source_side.as_str(), - "sourceSeq": sample.source_seq, - "sourceRunId": sample.source_run_id, - "note": sample.note, + "window": row.window, + "sourceSide": row.source_side.as_str(), + "sourceStartSeq": row.source_start_seq, + "sourceStartRunId": row.source_start_run_id, + "peerSeq": row.peer_seq, + "peerRunId": row.peer_run_id, + "sourceEndSeq": row.source_end_seq, + "sourceEndRunId": row.source_end_run_id, + "sourceStartTime": row.source_start_time, + "peerTime": row.peer_time, + "sourceEndTime": row.source_end_time, + "total": row.total, + "counts": row.class_counts, }) } @@ -206,21 +162,47 @@ fn sandwich_sample_to_json(sample: &SandwichRecord) -> Value { }) } -fn count_class(result: &AnalysisResult, class: &'static str) -> usize { - result - .stats - .get(class) - .map(|stats| stats.total) - .unwrap_or(0) +fn churn_records_to_json(records: &[ChurnRecord]) -> Vec { + records + .iter() + .map(|record| { + json!({ + "side": record.side.as_str(), + "rpId": record.rp_id, + "fromSeq": record.from_seq, + "toSeq": record.to_seq, + "fromRunId": record.from_run_id, + "toRunId": record.to_run_id, + "setType": record.set_type, + "fromCount": record.from_count, + "toCount": record.to_count, + "addedCount": record.added_count, + "removedCount": record.removed_count, + "changedAbs": record.changed_abs, + "unionCount": record.union_count, + "changedRatioFrom": record.changed_ratio_from, + "changedRatioUnion": record.changed_ratio_union, + }) + }) + .collect() } -fn count_prefix(result: &AnalysisResult, prefix: &str) -> usize { - result - .stats +fn churn_summary_to_json(summary: &[ChurnSummaryRecord]) -> Vec { + summary .iter() - .filter(|(class, _)| class.starts_with(prefix)) - .map(|(_, stats)| stats.total) - .sum() + .map(|record| { + json!({ + "side": record.side.as_str(), + "setType": record.set_type, + "maxChangedAbs": record.max_changed_abs, + "avgChangedAbs": record.avg_changed_abs, + "maxChangedRatioFrom": record.max_changed_ratio_from, + "avgChangedRatioFrom": record.avg_changed_ratio_from, + "maxChangedRatioUnion": record.max_changed_ratio_union, + "avgChangedRatioUnion": record.avg_changed_ratio_union, + }) + }) + .collect() } pub(super) fn write_json(path: &Path, value: &Value) -> Result<(), String> { @@ -235,142 +217,22 @@ pub(super) fn write_markdown(path: &Path, output: &Value) -> Result<(), String> let mut lines = vec![ "# CCR/CIR Sequence Triage Summary".to_string(), "".to_string(), - format!( - "- `generatedBy`: `{}`", - output["generatedBy"].as_str().unwrap_or("") - ), - format!( - "- `leftSamples`: `{}`", - output["left"]["sampleCount"].as_u64().unwrap_or(0) - ), - format!( - "- `rightSamples`: `{}`", - output["right"]["sampleCount"].as_u64().unwrap_or(0) - ), - format!( - "- `alignWindowRuns`: `{}`", - output["parameters"]["alignWindowRuns"] - .as_u64() - .unwrap_or(0) - ), - format!( - "- `alignWindowSecs`: `{}`", - output["parameters"]["alignWindowSecs"] - .as_i64() - .unwrap_or(0) - ), - format!( - "- `warmupSamples`: `{}`", - output["adjusted"]["warmupSamples"].as_u64().unwrap_or(0) - ), - format!( - "- `cooldownSamples`: `{}`", - output["adjusted"]["cooldownSamples"].as_u64().unwrap_or(0) - ), - "".to_string(), - "## Classification Counts".to_string(), - "".to_string(), - "| Classification | Count |".to_string(), - "|---|---:|".to_string(), - ]; - if let Some(classes) = output["classificationCounts"].as_array() { - for item in classes { - lines.push(format!( - "| `{}` | {} |", - item["classification"].as_str().unwrap_or(""), - item["count"].as_u64().unwrap_or(0) - )); - } - } - lines.extend([ - "".to_string(), - "## Adjusted Boundary / Rollover Classification".to_string(), - "".to_string(), - format!( - "- `rawPersistent`: `{}` occurrences / `{}` unique keys", - output["adjusted"]["rawPersistent"]["occurrences"] - .as_u64() - .unwrap_or(0), - output["adjusted"]["rawPersistent"]["uniqueKeys"] - .as_u64() - .unwrap_or(0) - ), - format!( - "- `edgeFilteredPersistent`: `{}` occurrences / `{}` unique keys", - output["adjusted"]["edgeFilteredPersistent"]["occurrences"] - .as_u64() - .unwrap_or(0), - output["adjusted"]["edgeFilteredPersistent"]["uniqueKeys"] - .as_u64() - .unwrap_or(0) - ), - format!( - "- `adjustedStablePersistent`: `{}` occurrences / `{}` unique keys", - output["adjusted"]["adjustedStablePersistent"]["occurrences"] - .as_u64() - .unwrap_or(0), - output["adjusted"]["adjustedStablePersistent"]["uniqueKeys"] - .as_u64() - .unwrap_or(0) - ), - "".to_string(), - "| Adjusted Classification | Occurrences | Unique Keys |".to_string(), - "|---|---:|---:|".to_string(), - ]); - if let Some(classes) = output["adjusted"]["classificationCounts"].as_array() { - for item in classes { - lines.push(format!( - "| `{}` | {} | {} |", - item["classification"].as_str().unwrap_or(""), - item["occurrences"].as_u64().unwrap_or(0), - item["uniqueKeys"].as_u64().unwrap_or(0) - )); - } - } - if let Some(groups) = output["adjusted"]["stableObjectGroups"].as_array() - && !groups.is_empty() - { - lines.extend([ - "".to_string(), - "## Stable Object Groups".to_string(), - "".to_string(), - "| Source | CIR | Publication Point | Events | Physical Objects |".to_string(), - "|---|---|---|---:|---:|".to_string(), - ]); - for group in groups { - lines.push(format!( - "| `{}/seq{}/{}` | `{}` | `{}` | {} | {} |", - group["sourceSide"].as_str().unwrap_or(""), - group["sourceSeq"].as_u64().unwrap_or(0), - group["sourceRunId"].as_str().unwrap_or(""), - group["sourceCirPath"].as_str().unwrap_or(""), - group["publicationPoint"].as_str().unwrap_or(""), - group["eventCount"].as_u64().unwrap_or(0), - group["physicalObjectCount"].as_u64().unwrap_or(0), - )); - } - } - lines.extend([ + format!("- `schemaVersion`: `{}`", output["schemaVersion"].as_u64().unwrap_or(0)), + format!("- `generatedBy`: `{}`", output["generatedBy"].as_str().unwrap_or("")), + format!("- `inputContract`: `{}`", output["inputContract"].as_str().unwrap_or("")), + format!("- `leftSamples`: `{}`", output["left"]["sampleCount"].as_u64().unwrap_or(0)), + format!("- `rightSamples`: `{}`", output["right"]["sampleCount"].as_u64().unwrap_or(0)), + format!("- `alignWindowSecs`: `{}`", output["parameters"]["alignWindowSecs"].as_i64().unwrap_or(0)), "".to_string(), "## Sandwich Anomaly Check".to_string(), "".to_string(), - format!( - "- `occurrences`: `{}`", - output["sandwich"]["totals"]["occurrences"] - .as_u64() - .unwrap_or(0) - ), - format!( - "- `uniqueKeys`: `{}`", - output["sandwich"]["totals"]["uniqueKeys"] - .as_u64() - .unwrap_or(0) - ), + format!("- `occurrences`: `{}`", output["sandwich"]["totals"]["occurrences"].as_u64().unwrap_or(0)), + format!("- `uniqueKeys`: `{}`", output["sandwich"]["totals"]["uniqueKeys"].as_u64().unwrap_or(0)), "- Rule: source side has two adjacent stable samples, and peer sample timestamp is strictly between them.".to_string(), "".to_string(), "| Sandwich Classification | Occurrences | Unique Keys |".to_string(), "|---|---:|---:|".to_string(), - ]); + ]; if let Some(classes) = output["sandwich"]["classificationCounts"].as_array() { for item in classes { lines.push(format!( @@ -381,28 +243,86 @@ pub(super) fn write_markdown(path: &Path, output: &Value) -> Result<(), String> )); } } + lines.extend([ + "".to_string(), + "## Sandwich Heatmap".to_string(), + "".to_string(), + "| Window | Missing Object | Hash Mismatch Object | Missing Output | Missing Reject | Total |".to_string(), + "|---|---:|---:|---:|---:|---:|".to_string(), + ]); + if let Some(rows) = output["sandwich"]["heatmap"].as_array() { + for row in rows { + let counts = &row["counts"]; + lines.push(format!( + "| `{}` | {} | {} | {} | {} | {} |", + row["window"].as_str().unwrap_or(""), + counts["PEER_MISSING_STABLE_OBJECT"].as_u64().unwrap_or(0), + counts["PEER_HASH_MISMATCH_STABLE_OBJECT"] + .as_u64() + .unwrap_or(0), + counts["PEER_MISSING_STABLE_OUTPUT"].as_u64().unwrap_or(0), + counts["PEER_MISSING_STABLE_REJECT"].as_u64().unwrap_or(0), + row["total"].as_u64().unwrap_or(0) + )); + } + } + lines.extend([ + "".to_string(), + "## Intra-RP Adjacent Churn Summary".to_string(), + "".to_string(), + "| Side | Set Type | Max Changed | Avg Changed | Max Ratio From | Max Ratio Union |" + .to_string(), + "|---|---|---:|---:|---:|---:|".to_string(), + ]); + if let Some(summary) = output["intraRpChurn"]["summary"].as_array() { + for item in summary { + lines.push(format!( + "| `{}` | `{}` | {} | {:.2} | {:.6} | {:.6} |", + item["side"].as_str().unwrap_or(""), + item["setType"].as_str().unwrap_or(""), + item["maxChangedAbs"].as_u64().unwrap_or(0), + item["avgChangedAbs"].as_f64().unwrap_or(0.0), + item["maxChangedRatioFrom"].as_f64().unwrap_or(0.0), + item["maxChangedRatioUnion"].as_f64().unwrap_or(0.0) + )); + } + } + lines.extend([ + "".to_string(), + "## Intra-RP Adjacent Churn Details".to_string(), + "".to_string(), + "| Side | From -> To | Set Type | From | To | Added | Removed | Changed | Ratio From | Ratio Union |".to_string(), + "|---|---|---|---:|---:|---:|---:|---:|---:|---:|".to_string(), + ]); + for side_key in ["left", "right"] { + if let Some(records) = output["intraRpChurn"][side_key].as_array() { + for item in records { + lines.push(format!( + "| `{}` | `{}` -> `{}` | `{}` | {} | {} | {} | {} | {} | {:.6} | {:.6} |", + item["side"].as_str().unwrap_or(""), + item["fromRunId"].as_str().unwrap_or(""), + item["toRunId"].as_str().unwrap_or(""), + item["setType"].as_str().unwrap_or(""), + item["fromCount"].as_u64().unwrap_or(0), + item["toCount"].as_u64().unwrap_or(0), + item["addedCount"].as_u64().unwrap_or(0), + item["removedCount"].as_u64().unwrap_or(0), + item["changedAbs"].as_u64().unwrap_or(0), + item["changedRatioFrom"].as_f64().unwrap_or(0.0), + item["changedRatioUnion"].as_f64().unwrap_or(0.0) + )); + } + } + } lines.extend([ "".to_string(), "## Interpretation".to_string(), "".to_string(), - "- `TEMPORAL_LAG_RESOLVED` / `CONTENT_ROLLOVER_RESOLVED` 表示差异在后续采样中对齐,优先视为采样时刻或仓库滚动窗口差异。".to_string(), - "- `PERSISTENT_*` 表示配置窗口内仍未对齐,才是后续人工排查的高价值候选。".to_string(), - "- `EDGE_*` 表示差异落在序列首尾,缺少前置或后续观察,不能直接当作实现差异。".to_string(), - "- `STABLE_*` 表示经过首尾边界过滤和 URI-level rollover 过滤后仍存在的稳定候选。".to_string(), + "- `PEER_MISSING_STABLE_OBJECT` / `PEER_HASH_MISMATCH_STABLE_OBJECT` 表示输入对象在一个 RP 的稳定夹层内,与另一个 RP 的中间采样点不一致。".to_string(), + "- `PEER_MISSING_STABLE_OUTPUT` 表示输出 VRP/VAP 在稳定夹层内缺失,是更接近最终产物差异的信号。".to_string(), + "- `PEER_MISSING_STABLE_REJECT` 表示拒绝列表在稳定夹层内缺失,用于发现验证失败捕获差异。".to_string(), + "- `intraRpChurn` 是同一个 RP 自身相邻 run 的自然变化基线,不代表跨 RP 异常。".to_string(), ]); std::fs::write(path, lines.join("\n") + "\n") .map_err(|e| format!("write markdown failed: {}: {e}", path.display())) } - -pub(super) fn write_samples_jsonl(path: &Path, result: &AnalysisResult) -> Result<(), String> { - let mut body = String::new(); - for stats in result.stats.values() { - for sample in &stats.samples { - body.push_str( - &serde_json::to_string(&sample_to_json(sample)).map_err(|e| e.to_string())?, - ); - body.push('\n'); - } - } - std::fs::write(path, body).map_err(|e| format!("write samples failed: {}: {e}", path.display())) -} diff --git a/src/tools/sequence_triage_ccr_cir/sandwich.rs b/src/tools/sequence_triage_ccr_cir/sandwich.rs index de5a26c..1c27e2e 100644 --- a/src/tools/sequence_triage_ccr_cir/sandwich.rs +++ b/src/tools/sequence_triage_ccr_cir/sandwich.rs @@ -2,7 +2,7 @@ use std::collections::BTreeSet; use super::args::Args; use super::io::format_time; -use super::model::{SandwichAnalysis, SandwichRecord, SequenceSample, Side}; +use super::model::{SandwichAnalysis, SandwichHeatmapRow, SandwichRecord, SequenceSample, Side}; pub(super) fn build_sandwich_analysis( args: &Args, @@ -241,6 +241,7 @@ impl SandwichAnalysis { self.total_occurrences += 1; self.unique_keys.insert(sandwich_unique_key(&record)); *self.by_set_type.entry(record.set_type).or_default() += 1; + self.add_heatmap_record(class, &record); let stats = self.stats.entry(class).or_default(); stats.total += 1; stats.unique_keys.insert(sandwich_unique_key(&record)); @@ -248,6 +249,37 @@ impl SandwichAnalysis { stats.samples.push(record); } } + + fn add_heatmap_record(&mut self, class: &'static str, record: &SandwichRecord) { + let key = format!( + "{}|{}|{}", + record.source_start_run_id, record.peer_run_id, record.source_end_run_id + ); + let window = format!( + "{} - {} - {}", + record.source_start_run_id, record.peer_run_id, record.source_end_run_id + ); + let row = self + .heatmap + .entry(key) + .or_insert_with(|| SandwichHeatmapRow { + window, + source_side: record.source_side, + source_start_seq: record.source_start_seq, + source_start_run_id: record.source_start_run_id.clone(), + source_end_seq: record.source_end_seq, + source_end_run_id: record.source_end_run_id.clone(), + peer_seq: record.peer_seq, + peer_run_id: record.peer_run_id.clone(), + source_start_time: record.source_start_time.clone(), + peer_time: record.peer_time.clone(), + source_end_time: record.source_end_time.clone(), + total: 0, + class_counts: Default::default(), + }); + row.total += 1; + *row.class_counts.entry(class).or_default() += 1; + } } fn sandwich_unique_key(record: &SandwichRecord) -> String {