From b3b44d50c6f9462810734cf289bb6c3a8df54a53 Mon Sep 17 00:00:00 2001 From: yuyr Date: Sun, 3 May 2026 08:26:18 +0800 Subject: [PATCH] =?UTF-8?q?20260501=20=E4=BF=AE=E5=A4=8DCurrentRepoIndex?= =?UTF-8?q?=E5=AE=8C=E6=95=B4=E6=80=A7=E5=B9=B6=E4=BC=98=E5=8C=96replay=20?= =?UTF-8?q?guard?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/bin/db_stats.rs | 11 +- src/cir/export.rs | 38 +++ src/cli.rs | 35 ++ src/parallel/config.rs | 3 + src/progress_log.rs | 7 + src/storage.rs | 175 +++++++++- src/sync/rrdp.rs | 193 ++++++++++- src/validation/manifest.rs | 201 ++++++++++- src/validation/objects.rs | 39 ++- src/validation/tree_parallel.rs | 589 ++++++++++++++++++++++++++++++-- 10 files changed, 1230 insertions(+), 61 deletions(-) diff --git a/src/bin/db_stats.rs b/src/bin/db_stats.rs index ac50f22..5a8096e 100644 --- a/src/bin/db_stats.rs +++ b/src/bin/db_stats.rs @@ -4,8 +4,9 @@ use std::path::{Path, PathBuf}; use rocksdb::{DB, IteratorMode, Options}; use rpki::storage::{ - ALL_COLUMN_FAMILY_NAMES, CF_AUDIT_RULE_INDEX, CF_RAW_BY_HASH, CF_REPOSITORY_VIEW, - CF_RRDP_SOURCE, CF_RRDP_SOURCE_MEMBER, CF_RRDP_URI_OWNER, CF_VCIR, column_family_descriptors, + ALL_COLUMN_FAMILY_NAMES, CF_AUDIT_RULE_INDEX, CF_MANIFEST_REPLAY_META, CF_RAW_BY_HASH, + CF_REPOSITORY_VIEW, CF_RRDP_SOURCE, CF_RRDP_SOURCE_MEMBER, CF_RRDP_URI_OWNER, CF_VCIR, + column_family_descriptors, }; #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -182,7 +183,7 @@ fn collect_db_file_stats(db_path: &Path) -> Result CfGroup { match cf_name { CF_REPOSITORY_VIEW | CF_RAW_BY_HASH => CfGroup::CurrentRepositoryView, - CF_VCIR | CF_AUDIT_RULE_INDEX => CfGroup::CurrentValidationState, + CF_VCIR | CF_MANIFEST_REPLAY_META | CF_AUDIT_RULE_INDEX => CfGroup::CurrentValidationState, CF_RRDP_SOURCE | CF_RRDP_SOURCE_MEMBER | CF_RRDP_URI_OWNER => CfGroup::CurrentRrdpState, _ => CfGroup::LegacyCompatibility, } @@ -370,6 +371,10 @@ mod tests { assert_eq!(cf_group(CF_REPOSITORY_VIEW), CfGroup::CurrentRepositoryView); assert_eq!(cf_group(CF_RAW_BY_HASH), CfGroup::CurrentRepositoryView); assert_eq!(cf_group(CF_VCIR), CfGroup::CurrentValidationState); + assert_eq!( + cf_group(CF_MANIFEST_REPLAY_META), + CfGroup::CurrentValidationState + ); assert_eq!( cf_group(CF_AUDIT_RULE_INDEX), CfGroup::CurrentValidationState diff --git a/src/cir/export.rs b/src/cir/export.rs index 9746596..4770bd9 100644 --- a/src/cir/export.rs +++ b/src/cir/export.rs @@ -178,6 +178,7 @@ pub fn build_cir_from_run_multi( tal_bytes: binding.trust_anchor.tal.raw.clone(), }); } + tals.sort_by(|a, b| a.tal_uri.cmp(&b.tal_uri)); let cir = CanonicalInputRepresentation { version: CIR_VERSION_V1, @@ -700,6 +701,43 @@ mod tests { ); } + #[test] + fn build_cir_from_run_multi_sorts_tals_by_tal_uri() { + let td = tempfile::tempdir().unwrap(); + let store = RocksStore::open(td.path()).unwrap(); + let apnic = sample_trust_anchor(); + let arin = sample_arin_trust_anchor(); + + let cir = build_cir_from_run_multi( + &store, + &[ + CirTalBinding { + trust_anchor: &apnic, + tal_uri: "https://rpki.apnic.net/repository/apnic-rpki-root-iana-origin.cer", + }, + CirTalBinding { + trust_anchor: &arin, + tal_uri: "https://rrdp.arin.net/arin-rpki-ta.cer", + }, + ], + sample_time(), + &[], + Some(&[]), + ) + .expect("build cir with unsorted input bindings"); + + assert_eq!( + cir.tals + .iter() + .map(|tal| tal.tal_uri.as_str()) + .collect::>(), + vec![ + "https://rpki.apnic.net/repository/apnic-rpki-root-iana-origin.cer", + "https://rrdp.arin.net/arin-rpki-ta.cer", + ] + ); + } + #[test] fn build_cir_from_run_multi_rejects_invalid_tal_uri_and_missing_rsync_ta_uri() { let td = tempfile::tempdir().unwrap(); diff --git a/src/cli.rs b/src/cli.rs index 588b0fb..d19b6cc 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -154,6 +154,8 @@ Options: Phase 2 object worker count (default: 8) --parallel-phase2-worker-queue-capacity Phase 2 per-worker object queue capacity (default: 256) + --parallel-phase2-ready-batch-size + Phase 2 ready publication points processed per scheduler turn (default: 256) --rsync-local-dir Use LocalDirRsyncFetcher rooted at this directory (offline tests) --disable-rrdp Disable RRDP and synchronize only via rsync @@ -282,6 +284,15 @@ pub fn parse_args(argv: &[String]) -> Result { .parse::() .map_err(|_| format!("invalid --parallel-phase2-worker-queue-capacity: {v}"))?; } + "--parallel-phase2-ready-batch-size" => { + i += 1; + let v = argv + .get(i) + .ok_or("--parallel-phase2-ready-batch-size requires a value")?; + parallel_phase2_cfg.ready_batch_size = v + .parse::() + .map_err(|_| format!("invalid --parallel-phase2-ready-batch-size: {v}"))?; + } "--db" => { i += 1; let v = argv.get(i).ok_or("--db requires a value")?; @@ -496,6 +507,12 @@ pub fn parse_args(argv: &[String]) -> Result { usage() )); } + if parallel_phase2_cfg.ready_batch_size == 0 { + return Err(format!( + "--parallel-phase2-ready-batch-size must be > 0\n\n{}", + usage() + )); + } if !tal_urls.is_empty() && !ta_paths.is_empty() { return Err(format!( "--ta-path cannot be used with --tal-url mode\n\n{}", @@ -2172,13 +2189,31 @@ mod tests { "3".to_string(), "--parallel-phase2-worker-queue-capacity".to_string(), "17".to_string(), + "--parallel-phase2-ready-batch-size".to_string(), + "31".to_string(), ]; let args = parse_args(&argv).expect("parse args"); assert_eq!(args.parallel_phase2_config.object_workers, 3); assert_eq!(args.parallel_phase2_config.worker_queue_capacity, 17); + assert_eq!(args.parallel_phase2_config.ready_batch_size, 31); assert_eq!(args.parallel_phase1_config, ParallelPhase1Config::default()); } + #[test] + fn parse_rejects_zero_phase2_ready_batch_size() { + let argv = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-url".to_string(), + "https://example.test/root.tal".to_string(), + "--parallel-phase2-ready-batch-size".to_string(), + "0".to_string(), + ]; + let err = parse_args(&argv).expect_err("zero ready batch must fail"); + assert!(err.contains("--parallel-phase2-ready-batch-size"), "{err}"); + } + #[test] fn parse_rejects_removed_parallel_enable_flags() { let argv = vec![ diff --git a/src/parallel/config.rs b/src/parallel/config.rs index 138909a..cda3684 100644 --- a/src/parallel/config.rs +++ b/src/parallel/config.rs @@ -9,6 +9,7 @@ pub struct ParallelPhase1Config { pub struct ParallelPhase2Config { pub object_workers: usize, pub worker_queue_capacity: usize, + pub ready_batch_size: usize, } impl Default for ParallelPhase2Config { @@ -16,6 +17,7 @@ impl Default for ParallelPhase2Config { Self { object_workers: 8, worker_queue_capacity: 256, + ready_batch_size: 256, } } } @@ -47,5 +49,6 @@ mod tests { let cfg = ParallelPhase2Config::default(); assert!(cfg.object_workers > 0); assert!(cfg.worker_queue_capacity > 0); + assert!(cfg.ready_batch_size > 0); } } diff --git a/src/progress_log.rs b/src/progress_log.rs index 4f23ca1..da8a548 100644 --- a/src/progress_log.rs +++ b/src/progress_log.rs @@ -16,6 +16,13 @@ pub fn slow_threshold_secs() -> f64 { .unwrap_or(30.0) } +pub fn stage_fresh_slow_threshold_ms() -> u64 { + std::env::var("RPKI_PROGRESS_STAGE_FRESH_SLOW_MS") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(1_000) +} + pub fn emit(kind: &str, payload: Value) { if !progress_enabled() { return; diff --git a/src/storage.rs b/src/storage.rs index 3875ae0..4fd3349 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -16,6 +16,7 @@ pub const CF_REPOSITORY_VIEW: &str = "repository_view"; pub const CF_RAW_BY_HASH: &str = "raw_by_hash"; pub const CF_RAW_BLOB: &str = "raw_blob"; pub const CF_VCIR: &str = "vcir"; +pub const CF_MANIFEST_REPLAY_META: &str = "manifest_replay_meta"; pub const CF_AUDIT_RULE_INDEX: &str = "audit_rule_index"; pub const CF_RRDP_SOURCE: &str = "rrdp_source"; pub const CF_RRDP_SOURCE_MEMBER: &str = "rrdp_source_member"; @@ -26,6 +27,7 @@ pub const ALL_COLUMN_FAMILY_NAMES: &[&str] = &[ CF_RAW_BY_HASH, CF_RAW_BLOB, CF_VCIR, + CF_MANIFEST_REPLAY_META, CF_AUDIT_RULE_INDEX, CF_RRDP_SOURCE, CF_RRDP_SOURCE_MEMBER, @@ -36,6 +38,7 @@ const REPOSITORY_VIEW_KEY_PREFIX: &str = "repo_view:"; const RAW_BY_HASH_KEY_PREFIX: &str = "rawbyhash:"; const RAW_BLOB_KEY_PREFIX: &str = "rawblob:"; const VCIR_KEY_PREFIX: &str = "vcir:"; +const MANIFEST_REPLAY_META_KEY_PREFIX: &str = "manifest_replay_meta:"; const AUDIT_ROA_RULE_KEY_PREFIX: &str = "audit:roa_rule:"; const AUDIT_ASPA_RULE_KEY_PREFIX: &str = "audit:aspa_rule:"; const AUDIT_ROUTER_KEY_RULE_KEY_PREFIX: &str = "audit:router_key_rule:"; @@ -290,6 +293,57 @@ impl ValidatedManifestMeta { } } +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ManifestReplayMeta { + pub manifest_rsync_uri: String, + pub manifest_number_be: Vec, + pub manifest_this_update: PackTime, + pub manifest_sha256: Vec, + pub updated_at_validation_time: PackTime, +} + +impl ManifestReplayMeta { + pub fn from_vcir(vcir: &ValidatedCaInstanceResult) -> Self { + Self { + manifest_rsync_uri: vcir.manifest_rsync_uri.clone(), + manifest_number_be: vcir + .validated_manifest_meta + .validated_manifest_number + .clone(), + manifest_this_update: vcir + .validated_manifest_meta + .validated_manifest_this_update + .clone(), + manifest_sha256: vcir.ccr_manifest_projection.manifest_sha256.clone(), + updated_at_validation_time: vcir.last_successful_validation_time.clone(), + } + } + + pub fn validate_internal(&self) -> StorageResult<()> { + validate_non_empty( + "manifest_replay_meta.manifest_rsync_uri", + &self.manifest_rsync_uri, + )?; + validate_manifest_number_be( + "manifest_replay_meta.manifest_number_be", + &self.manifest_number_be, + )?; + parse_time( + "manifest_replay_meta.manifest_this_update", + &self.manifest_this_update, + )?; + validate_sha256_digest_bytes( + "manifest_replay_meta.manifest_sha256", + &self.manifest_sha256, + )?; + parse_time( + "manifest_replay_meta.updated_at_validation_time", + &self.updated_at_validation_time, + )?; + Ok(()) + } +} + #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct VcirCcrManifestProjection { pub manifest_rsync_uri: String, @@ -1270,13 +1324,18 @@ impl RocksStore { pub fn put_vcir(&self, vcir: &ValidatedCaInstanceResult) -> StorageResult<()> { vcir.validate_internal()?; - let cf = self.cf(CF_VCIR)?; + let vcir_cf = self.cf(CF_VCIR)?; + let replay_cf = self.cf(CF_MANIFEST_REPLAY_META)?; + let replay_meta = ManifestReplayMeta::from_vcir(vcir); + replay_meta.validate_internal()?; + let mut batch = WriteBatch::default(); let key = vcir_key(&vcir.manifest_rsync_uri); let value = encode_cbor(vcir, "vcir")?; - self.db - .put_cf(cf, key.as_bytes(), value) - .map_err(|e| StorageError::RocksDb(e.to_string()))?; - Ok(()) + batch.put_cf(vcir_cf, key.as_bytes(), value); + let replay_key = manifest_replay_meta_key(&replay_meta.manifest_rsync_uri); + let replay_value = encode_cbor(&replay_meta, "manifest_replay_meta")?; + batch.put_cf(replay_cf, replay_key.as_bytes(), replay_value); + self.write_batch(batch) } pub fn replace_vcir_and_audit_rule_indexes( @@ -1286,12 +1345,18 @@ impl RocksStore { ) -> StorageResult<()> { vcir.validate_internal()?; let vcir_cf = self.cf(CF_VCIR)?; + let replay_cf = self.cf(CF_MANIFEST_REPLAY_META)?; let audit_cf = self.cf(CF_AUDIT_RULE_INDEX)?; let mut batch = WriteBatch::default(); let vcir_key = vcir_key(&vcir.manifest_rsync_uri); let vcir_value = encode_cbor(vcir, "vcir")?; batch.put_cf(vcir_cf, vcir_key.as_bytes(), vcir_value); + let replay_meta = ManifestReplayMeta::from_vcir(vcir); + replay_meta.validate_internal()?; + let replay_key = manifest_replay_meta_key(&replay_meta.manifest_rsync_uri); + let replay_value = encode_cbor(&replay_meta, "manifest_replay_meta")?; + batch.put_cf(replay_cf, replay_key.as_bytes(), replay_value); if let Some(previous) = previous { for output in &previous.local_outputs { @@ -1343,6 +1408,24 @@ impl RocksStore { Ok(Some(vcir)) } + pub fn get_manifest_replay_meta( + &self, + manifest_rsync_uri: &str, + ) -> StorageResult> { + let cf = self.cf(CF_MANIFEST_REPLAY_META)?; + let key = manifest_replay_meta_key(manifest_rsync_uri); + let Some(bytes) = self + .db + .get_cf(cf, key.as_bytes()) + .map_err(|e| StorageError::RocksDb(e.to_string()))? + else { + return Ok(None); + }; + let meta = decode_cbor::(&bytes, "manifest_replay_meta")?; + meta.validate_internal()?; + Ok(Some(meta)) + } + pub fn list_vcirs(&self) -> StorageResult> { let cf = self.cf(CF_VCIR)?; let mode = IteratorMode::Start; @@ -1357,12 +1440,14 @@ impl RocksStore { } pub fn delete_vcir(&self, manifest_rsync_uri: &str) -> StorageResult<()> { - let cf = self.cf(CF_VCIR)?; + let vcir_cf = self.cf(CF_VCIR)?; + let replay_cf = self.cf(CF_MANIFEST_REPLAY_META)?; + let mut batch = WriteBatch::default(); let key = vcir_key(manifest_rsync_uri); - self.db - .delete_cf(cf, key.as_bytes()) - .map_err(|e| StorageError::RocksDb(e.to_string()))?; - Ok(()) + batch.delete_cf(vcir_cf, key.as_bytes()); + let replay_key = manifest_replay_meta_key(manifest_rsync_uri); + batch.delete_cf(replay_cf, replay_key.as_bytes()); + self.write_batch(batch) } pub fn put_audit_rule_index_entry(&self, entry: &AuditRuleIndexEntry) -> StorageResult<()> { @@ -1622,6 +1707,10 @@ fn vcir_key(manifest_rsync_uri: &str) -> String { format!("{VCIR_KEY_PREFIX}{manifest_rsync_uri}") } +fn manifest_replay_meta_key(manifest_rsync_uri: &str) -> String { + format!("{MANIFEST_REPLAY_META_KEY_PREFIX}{manifest_rsync_uri}") +} + fn audit_rule_kind_for_output_type(output_type: VcirOutputType) -> Option { match output_type { VcirOutputType::Vrp => Some(AuditRuleKind::Roa), @@ -2704,6 +2793,26 @@ mod tests { .expect("get vcir") .expect("vcir exists"); assert_eq!(got, vcir); + let replay_meta = store + .get_manifest_replay_meta(&vcir.manifest_rsync_uri) + .expect("get manifest replay meta") + .expect("manifest replay meta exists"); + assert_eq!( + replay_meta, + ManifestReplayMeta { + manifest_rsync_uri: vcir.manifest_rsync_uri.clone(), + manifest_number_be: vcir + .validated_manifest_meta + .validated_manifest_number + .clone(), + manifest_this_update: vcir + .validated_manifest_meta + .validated_manifest_this_update + .clone(), + manifest_sha256: vcir.ccr_manifest_projection.manifest_sha256.clone(), + updated_at_validation_time: vcir.last_successful_validation_time.clone(), + } + ); let mut invalid = sample_vcir("rsync://example.test/repo/invalid.mft"); invalid.summary.local_vrp_count = 9; @@ -2728,6 +2837,37 @@ mod tests { .expect("get deleted vcir") .is_none() ); + assert!( + store + .get_manifest_replay_meta(&vcir.manifest_rsync_uri) + .expect("get deleted manifest replay meta") + .is_none() + ); + } + + #[test] + fn manifest_replay_meta_validation_reports_invalid_fields() { + let mut meta = ManifestReplayMeta { + manifest_rsync_uri: "rsync://example.test/repo/current.mft".to_string(), + manifest_number_be: vec![3], + manifest_this_update: pack_time(0), + manifest_sha256: vec![0x11; 32], + updated_at_validation_time: pack_time(1), + }; + meta.validate_internal().expect("valid replay meta"); + + meta.manifest_sha256 = vec![0x11; 31]; + let err = meta + .validate_internal() + .expect_err("short manifest sha must fail"); + assert!(err.to_string().contains("must be 32 bytes")); + + meta.manifest_sha256 = vec![0x11; 32]; + meta.manifest_number_be = vec![0, 3]; + let err = meta + .validate_internal() + .expect_err("non-minimal manifest number must fail"); + assert!(err.to_string().contains("minimal big-endian")); } #[test] @@ -2855,6 +2995,18 @@ mod tests { .expect("get replaced vcir") .expect("vcir exists"); assert_eq!(got, current); + let replay_meta = store + .get_manifest_replay_meta(¤t.manifest_rsync_uri) + .expect("get replaced replay meta") + .expect("replay meta exists"); + assert_eq!( + replay_meta.manifest_number_be, + current.validated_manifest_meta.validated_manifest_number + ); + assert_eq!( + replay_meta.manifest_sha256, + current.ccr_manifest_projection.manifest_sha256 + ); assert!( store .get_audit_rule_index_entry( @@ -2877,9 +3029,6 @@ mod tests { #[test] fn storage_helpers_cover_optional_validation_paths() { - let td = tempfile::tempdir().expect("tempdir"); - let store = RocksStore::open(td.path()).expect("open rocksdb"); - let withdrawn = RepositoryViewEntry { rsync_uri: "rsync://example.test/repo/withdrawn.cer".to_string(), current_hash: Some(sha256_hex(b"withdrawn")), diff --git a/src/sync/rrdp.rs b/src/sync/rrdp.rs index 6693856..4e7dacd 100644 --- a/src/sync/rrdp.rs +++ b/src/sync/rrdp.rs @@ -2,7 +2,9 @@ use crate::analysis::timing::TimingHandle; use crate::audit::AuditDownloadKind; use crate::audit_downloads::DownloadLogHandle; use crate::current_repo_index::CurrentRepoIndexHandle; -use crate::storage::{RocksStore, RrdpDeltaOp, RrdpSourceSyncState}; +use crate::storage::{ + RepositoryViewEntry, RepositoryViewState, RocksStore, RrdpDeltaOp, RrdpSourceSyncState, +}; use crate::sync::store_projection::{ build_repository_view_present_entry, build_repository_view_withdrawn_entry, build_rrdp_source_member_present_record, build_rrdp_source_member_withdrawn_record, @@ -735,6 +737,22 @@ fn sync_from_notification_inner( if let Some(s) = same_session_state { if s.serial == notif.serial { + let _hydrate_step = timing + .as_ref() + .map(|t| t.span_rrdp_repo_step(notification_uri, "hydrate_current_index")); + let _hydrate_total = timing + .as_ref() + .map(|t| t.span_phase("rrdp_hydrate_current_index_total")); + let hydrated = hydrate_current_repo_index_from_rrdp_members( + store, + notification_uri, + current_repo_index, + )?; + drop(_hydrate_step); + drop(_hydrate_total); + if let Some(t) = timing.as_ref() { + t.record_count("rrdp_current_index_hydrated_objects_total", hydrated as u64); + } return Ok(0); } if s.serial > notif.serial { @@ -865,6 +883,25 @@ fn sync_from_notification_inner( if let Some(t) = timing.as_ref() { t.record_count("rrdp_delta_ops_applied_total", applied_total as u64); } + let _hydrate_step = timing.as_ref().map(|t| { + t.span_rrdp_repo_step(notification_uri, "hydrate_current_index") + }); + let _hydrate_total = timing + .as_ref() + .map(|t| t.span_phase("rrdp_hydrate_current_index_total")); + let hydrated = hydrate_current_repo_index_from_rrdp_members( + store, + notification_uri, + current_repo_index, + )?; + drop(_hydrate_step); + drop(_hydrate_total); + if let Some(t) = timing.as_ref() { + t.record_count( + "rrdp_current_index_hydrated_objects_total", + hydrated as u64, + ); + } return Ok(applied_total); } } @@ -960,6 +997,47 @@ fn sync_from_notification_inner( Ok(published) } +fn hydrate_current_repo_index_from_rrdp_members( + store: &RocksStore, + notification_uri: &str, + current_repo_index: Option<&CurrentRepoIndexHandle>, +) -> Result { + let Some(index) = current_repo_index else { + return Ok(0); + }; + + let members = store + .list_current_rrdp_source_members(notification_uri) + .map_err(|e| RrdpSyncError::Storage(e.to_string()))?; + if members.is_empty() { + return Ok(0); + } + + let mut entries = Vec::with_capacity(members.len()); + for member in members { + let current_hash = member.current_hash.ok_or_else(|| { + RrdpSyncError::Storage(format!( + "rrdp source member missing current_hash for current object {}", + member.rsync_uri + )) + })?; + entries.push(RepositoryViewEntry { + rsync_uri: member.rsync_uri, + current_hash: Some(current_hash), + repository_source: Some(notification_uri.to_string()), + object_type: member.object_type, + state: RepositoryViewState::Present, + }); + } + + index + .lock() + .map_err(|_| RrdpSyncError::Storage("current repo index lock poisoned".to_string()))? + .apply_repository_view_entries(&entries) + .map_err(RrdpSyncError::Storage)?; + Ok(entries.len()) +} + fn apply_delta( store: &RocksStore, notification_uri: &str, @@ -1704,6 +1782,7 @@ fn strip_all_ascii_whitespace(s: &str) -> String { mod tests { use super::*; use crate::analysis::timing::{TimingHandle, TimingMeta}; + use crate::current_repo_index::CurrentRepoIndex; use crate::storage::RocksStore; use std::collections::HashMap; use std::time::Duration; @@ -2625,6 +2704,118 @@ mod tests { assert_eq!(state.serial, 3); } + #[test] + fn sync_from_notification_same_serial_hydrates_current_repo_index() { + let tmp = tempfile::tempdir().expect("tempdir"); + let store = RocksStore::open(tmp.path()).expect("open rocksdb"); + + let sid = "550e8400-e29b-41d4-a716-446655440000"; + let notif_uri = "https://example.net/notification.xml"; + let snapshot_uri = "https://example.net/snapshot.xml"; + let uri_a = "rsync://example.net/repo/a.mft"; + let uri_b = "rsync://example.net/repo/b.roa"; + + let snapshot = snapshot_xml(sid, 1, &[(uri_a, b"a1"), (uri_b, b"b1")]); + let snapshot_hash = hex::encode(sha2::Sha256::digest(&snapshot)); + let notif = notification_xml(sid, 1, snapshot_uri, &snapshot_hash); + let fetcher_1 = MapFetcher { + map: HashMap::from([(snapshot_uri.to_string(), snapshot)]), + }; + sync_from_notification_snapshot(&store, notif_uri, ¬if, &fetcher_1).expect("seed"); + + let index = CurrentRepoIndex::shared(); + let no_fetcher = MapFetcher { + map: HashMap::new(), + }; + let applied = sync_from_notification_with_timing_and_download_log( + &store, + notif_uri, + Some(&index), + ¬if, + &no_fetcher, + None, + None, + ) + .expect("same serial no-op"); + assert_eq!(applied, 0); + + let index = index.lock().expect("lock index"); + assert_eq!(index.active_uri_count(), 2); + assert!(index.get_by_uri(uri_a).is_some()); + assert!(index.get_by_uri(uri_b).is_some()); + } + + #[test] + fn sync_from_notification_delta_hydrates_unchanged_current_repo_entries() { + let tmp = tempfile::tempdir().expect("tempdir"); + let store = RocksStore::open(tmp.path()).expect("open rocksdb"); + + let sid = "550e8400-e29b-41d4-a716-446655440000"; + let notif_uri = "https://example.net/notification.xml"; + let snapshot_uri_1 = "https://example.net/snapshot-1.xml"; + let uri_a = "rsync://example.net/repo/a.mft"; + let uri_b = "rsync://example.net/repo/b.roa"; + let uri_c = "rsync://example.net/repo/c.crl"; + + let snapshot_1 = snapshot_xml(sid, 1, &[(uri_a, b"a1"), (uri_b, b"b1")]); + let snapshot_hash_1 = hex::encode(sha2::Sha256::digest(&snapshot_1)); + let notif_1 = notification_xml(sid, 1, snapshot_uri_1, &snapshot_hash_1); + let fetcher_1 = MapFetcher { + map: HashMap::from([(snapshot_uri_1.to_string(), snapshot_1)]), + }; + sync_from_notification_snapshot(&store, notif_uri, ¬if_1, &fetcher_1).expect("seed"); + + let publish_c_b64 = base64::engine::general_purpose::STANDARD.encode(b"c2"); + let delta_2 = delta_xml( + sid, + 2, + &[&format!( + r#"{publish_c_b64}"# + )], + ); + let delta_2_hash_hex = hex::encode(sha2::Sha256::digest(&delta_2)); + let notif_2 = notification_xml_with_deltas( + sid, + 2, + "https://example.net/snapshot-2.xml", + &"00".repeat(32), + &[( + "d2", + 2, + "https://example.net/delta-2.xml", + &delta_2_hash_hex, + )], + ); + let fetcher_2 = MapFetcher { + map: HashMap::from([("https://example.net/delta-2.xml".to_string(), delta_2)]), + }; + + let index = CurrentRepoIndex::shared(); + let applied = sync_from_notification_with_timing_and_download_log( + &store, + notif_uri, + Some(&index), + ¬if_2, + &fetcher_2, + None, + None, + ) + .expect("delta sync"); + assert_eq!(applied, 1); + + let index = index.lock().expect("lock index"); + assert_eq!(index.active_uri_count(), 3); + assert!( + index.get_by_uri(uri_a).is_some(), + "unchanged object from the previous serial must be visible" + ); + assert!( + index.get_by_uri(uri_b).is_some(), + "unchanged object from the previous serial must be visible" + ); + assert!(index.get_by_uri(uri_c).is_some(), "delta publish visible"); + } + #[test] fn load_rrdp_local_state_uses_source_record_only() { let tmp = tempfile::tempdir().expect("tempdir"); diff --git a/src/validation/manifest.rs b/src/validation/manifest.rs index 667b9c2..1009d22 100644 --- a/src/validation/manifest.rs +++ b/src/validation/manifest.rs @@ -327,12 +327,20 @@ pub fn process_manifest_publication_point_fresh_after_repo_sync( #[derive(Clone, Debug, Default)] pub struct FreshPublicationPointTimingBreakdown { + pub current_index_lock_ms: u64, pub manifest_load_ms: u64, + pub manifest_index_lookup_ms: u64, + pub manifest_blob_load_ms: u64, pub manifest_decode_ms: u64, pub replay_guard_ms: u64, + pub replay_meta_hit: bool, + pub replay_meta_miss: bool, pub manifest_entries_ms: u64, pub pack_files_ms: u64, + pub pack_files_index_lookup_ms: u64, + pub pack_files_blob_load_ms: u64, pub ee_path_validate_ms: u64, + pub manifest_file_count: usize, } pub fn process_manifest_publication_point_fresh_after_repo_sync_with_timing( @@ -620,7 +628,9 @@ pub(crate) fn try_build_fresh_publication_point_with_timing( ManifestFreshError, > { let mut timing = FreshPublicationPointTimingBreakdown::default(); + let current_index_lock_started = std::time::Instant::now(); let current_index_guard = current_repo_index.and_then(|handle| handle.lock().ok()); + timing.current_index_lock_ms = current_index_lock_started.elapsed().as_millis() as u64; if !rsync_uri_is_under_publication_point(manifest_rsync_uri, publication_point_rsync_uri) { return Err(ManifestFreshError::ManifestOutsidePublicationPoint { @@ -631,11 +641,14 @@ pub(crate) fn try_build_fresh_publication_point_with_timing( let manifest_load_started = std::time::Instant::now(); let manifest_bytes = if let Some(index) = current_index_guard.as_ref() { + let manifest_lookup_started = std::time::Instant::now(); let current = index.get_by_uri(manifest_rsync_uri).ok_or_else(|| { ManifestFreshError::MissingManifest { manifest_rsync_uri: manifest_rsync_uri.to_string(), } })?; + timing.manifest_index_lookup_ms = manifest_lookup_started.elapsed().as_millis() as u64; + let manifest_blob_load_started = std::time::Instant::now(); store .get_blob_bytes(¤t.current_hash_hex) .map_err(|e| ManifestFreshError::MissingManifest { @@ -643,8 +656,13 @@ pub(crate) fn try_build_fresh_publication_point_with_timing( })? .ok_or_else(|| ManifestFreshError::MissingManifest { manifest_rsync_uri: manifest_rsync_uri.to_string(), + }) + .inspect(|_| { + timing.manifest_blob_load_ms = + manifest_blob_load_started.elapsed().as_millis() as u64; })? } else { + let manifest_blob_load_started = std::time::Instant::now(); store .load_current_object_bytes_by_uri(manifest_rsync_uri) .map_err(|e| ManifestFreshError::MissingManifest { @@ -652,6 +670,10 @@ pub(crate) fn try_build_fresh_publication_point_with_timing( })? .ok_or_else(|| ManifestFreshError::MissingManifest { manifest_rsync_uri: manifest_rsync_uri.to_string(), + }) + .inspect(|_| { + timing.manifest_blob_load_ms = + manifest_blob_load_started.elapsed().as_millis() as u64; })? }; timing.manifest_load_ms = manifest_load_started.elapsed().as_millis() as u64; @@ -684,20 +706,21 @@ pub(crate) fn try_build_fresh_publication_point_with_timing( // - If manifestNumber is higher, require thisUpdate to be more recent than the previously // validated thisUpdate. let replay_guard_started = std::time::Instant::now(); - if let Some(old_vcir) = store.get_vcir(manifest_rsync_uri).ok().flatten() { - if old_vcir.manifest_rsync_uri == manifest_rsync_uri { + if let Some(old_meta) = store + .get_manifest_replay_meta(manifest_rsync_uri) + .ok() + .flatten() + { + timing.replay_meta_hit = true; + if old_meta.manifest_rsync_uri == manifest_rsync_uri { let new_num = manifest.manifest.manifest_number.bytes_be.as_slice(); - let old_num = old_vcir - .validated_manifest_meta - .validated_manifest_number - .as_slice(); + let old_num = old_meta.manifest_number_be.as_slice(); match cmp_minimal_be_unsigned(new_num, old_num) { Ordering::Greater => { - let old_this_update = old_vcir - .validated_manifest_meta - .validated_manifest_this_update + let old_this_update = old_meta + .manifest_this_update .parse() - .expect("vcir internal validation ensures thisUpdate parses"); + .expect("manifest replay meta validation ensures thisUpdate parses"); if this_update <= old_this_update { use time::format_description::well_known::Rfc3339; return Err(ManifestFreshError::ThisUpdateNotIncreasing { @@ -712,14 +735,8 @@ pub(crate) fn try_build_fresh_publication_point_with_timing( } } Ordering::Equal => { - let old_manifest_hash = - old_vcir.related_artifacts.iter().find_map(|artifact| { - (artifact.artifact_role == VcirArtifactRole::Manifest - && artifact.uri.as_deref() == Some(manifest_rsync_uri)) - .then_some(artifact.sha256.as_str()) - }); - let new_manifest_hash = hex::encode(sha2::Sha256::digest(&manifest_bytes)); - if old_manifest_hash != Some(new_manifest_hash.as_str()) { + let new_manifest_hash = sha2::Sha256::digest(&manifest_bytes); + if old_meta.manifest_sha256.as_slice() != new_manifest_hash.as_slice() { return Err(ManifestFreshError::ManifestNumberNotIncreasing { old_hex: hex::encode_upper(old_num), new_hex: hex::encode_upper(new_num), @@ -734,6 +751,8 @@ pub(crate) fn try_build_fresh_publication_point_with_timing( } } } + } else { + timing.replay_meta_miss = true; } timing.replay_guard_ms = replay_guard_started.elapsed().as_millis() as u64; @@ -743,6 +762,7 @@ pub(crate) fn try_build_fresh_publication_point_with_timing( .parse_files() .map_err(ManifestDecodeError::Validate)?; timing.manifest_entries_ms = manifest_entries_started.elapsed().as_millis() as u64; + timing.manifest_file_count = entries.len(); let mut files = Vec::with_capacity(manifest.manifest.file_count()); let pack_files_started = std::time::Instant::now(); let external_raw_store = store @@ -753,22 +773,27 @@ pub(crate) fn try_build_fresh_publication_point_with_timing( .external_repo_bytes_ref() .cloned() .map(std::sync::Arc::new); + let mut pack_files_index_lookup_duration = std::time::Duration::ZERO; + let mut pack_files_blob_load_duration = std::time::Duration::ZERO; for entry in &entries { let rsync_uri = join_rsync_dir_and_file(publication_point_rsync_uri, entry.file_name.as_str()); let current_object = if let Some(index) = current_index_guard.as_ref() { + let index_lookup_started = std::time::Instant::now(); let current = index .get_by_uri(&rsync_uri) .ok_or_else(|| ManifestFreshError::MissingFile { rsync_uri: rsync_uri.clone(), })?; + pack_files_index_lookup_duration += index_lookup_started.elapsed(); crate::storage::CurrentObjectWithHash { current_hash_hex: current.current_hash_hex.clone(), current_hash: current.current_hash, bytes: Vec::new(), } } else { + let blob_load_started = std::time::Instant::now(); store .load_current_object_with_hash_by_uri(&rsync_uri) .map_err(|_e| ManifestFreshError::MissingFile { @@ -776,6 +801,9 @@ pub(crate) fn try_build_fresh_publication_point_with_timing( })? .ok_or_else(|| ManifestFreshError::MissingFile { rsync_uri: rsync_uri.clone(), + }) + .inspect(|_| { + pack_files_blob_load_duration += blob_load_started.elapsed(); })? }; @@ -803,6 +831,7 @@ pub(crate) fn try_build_fresh_publication_point_with_timing( )); } else { let bytes = if current_object.bytes.is_empty() { + let blob_load_started = std::time::Instant::now(); store .get_blob_bytes(¤t_object.current_hash_hex) .map_err(|_e| ManifestFreshError::MissingFile { @@ -810,6 +839,9 @@ pub(crate) fn try_build_fresh_publication_point_with_timing( })? .ok_or_else(|| ManifestFreshError::MissingFile { rsync_uri: rsync_uri.clone(), + }) + .inspect(|_| { + pack_files_blob_load_duration += blob_load_started.elapsed(); })? } else { current_object.bytes @@ -821,6 +853,8 @@ pub(crate) fn try_build_fresh_publication_point_with_timing( )); } } + timing.pack_files_index_lookup_ms = pack_files_index_lookup_duration.as_millis() as u64; + timing.pack_files_blob_load_ms = pack_files_blob_load_duration.as_millis() as u64; timing.pack_files_ms = pack_files_started.elapsed().as_millis() as u64; // RFC 6488 ยง3: manifest (signed object) validity includes a valid EE cert path. @@ -1044,12 +1078,67 @@ mod tests { .expect("put repository view entry"); } + fn put_complete_publication_point_current_objects( + store: &RocksStore, + manifest: &ManifestObject, + manifest_rsync_uri: &str, + manifest_bytes: Vec, + publication_point_rsync_uri: &str, + ) { + put_current_object(store, manifest_rsync_uri, manifest_bytes, "mft"); + for entry in manifest.manifest.parse_files().expect("parse files") { + let file_path = manifest_fixture_path() + .parent() + .unwrap() + .join(entry.file_name.as_str()); + let bytes = std::fs::read(&file_path).expect("read fixture file"); + let rsync_uri = format!("{publication_point_rsync_uri}{}", entry.file_name); + let object_type = rsync_uri.rsplit('.').next().unwrap_or("bin"); + put_current_object(store, &rsync_uri, bytes, object_type); + } + } + fn put_raw_only(store: &RocksStore, rsync_uri: &str, bytes: Vec, object_type: &str) { store .put_raw_by_hash_entry(&raw_by_hash_entry(rsync_uri, bytes, object_type)) .expect("put raw_by_hash entry"); } + fn sample_vcir_for_manifest_replay_meta( + manifest: &ManifestObject, + manifest_rsync_uri: &str, + publication_point_rsync_uri: &str, + manifest_bytes: &[u8], + validation_time: time::OffsetDateTime, + ) -> ValidatedCaInstanceResult { + let manifest_hash = hex::encode(sha2::Sha256::digest(manifest_bytes)); + let this_update = manifest + .manifest + .this_update + .to_offset(time::UtcOffset::UTC); + let mut vcir = sample_current_instance_vcir( + manifest_rsync_uri, + publication_point_rsync_uri, + &manifest_hash, + "rsync://example.test/repo/object.roa", + &hex::encode(sha2::Sha256::digest(b"object")), + validation_time, + true, + ); + vcir.validated_manifest_meta.validated_manifest_number = + manifest.manifest.manifest_number.bytes_be.clone(); + vcir.validated_manifest_meta.validated_manifest_this_update = + PackTime::from_utc_offset_datetime(this_update); + vcir.ccr_manifest_projection.manifest_number_be = + manifest.manifest.manifest_number.bytes_be.clone(); + vcir.ccr_manifest_projection.manifest_this_update = + PackTime::from_utc_offset_datetime(this_update); + vcir.ccr_manifest_projection.manifest_sha256 = + hex::decode(manifest_hash).expect("decode manifest hash"); + vcir.ccr_manifest_projection.manifest_size = manifest_bytes.len() as u64; + vcir + } + fn sample_current_instance_vcir( manifest_rsync_uri: &str, publication_point_rsync_uri: &str, @@ -1371,6 +1460,82 @@ mod tests { assert_eq!(fresh.files.len(), manifest.manifest.file_count()); } + #[test] + fn try_build_fresh_publication_point_records_replay_meta_miss() { + let temp = tempfile::tempdir().expect("tempdir"); + let store = RocksStore::open(temp.path()).expect("open rocksdb"); + let ( + manifest, + manifest_bytes, + manifest_rsync_uri, + publication_point_rsync_uri, + validation_time, + ) = load_manifest_fixture(); + put_complete_publication_point_current_objects( + &store, + &manifest, + &manifest_rsync_uri, + manifest_bytes, + &publication_point_rsync_uri, + ); + + let (_fresh, timing) = try_build_fresh_publication_point_with_timing( + &store, + &manifest_rsync_uri, + &publication_point_rsync_uri, + None, + &issuer_ca_fixture_der(), + Some(issuer_ca_rsync_uri()), + validation_time, + ) + .expect("fresh publication point without replay meta"); + + assert!(timing.replay_meta_miss); + assert!(!timing.replay_meta_hit); + } + + #[test] + fn try_build_fresh_publication_point_uses_replay_meta_hit_for_same_manifest() { + let temp = tempfile::tempdir().expect("tempdir"); + let store = RocksStore::open(temp.path()).expect("open rocksdb"); + let ( + manifest, + manifest_bytes, + manifest_rsync_uri, + publication_point_rsync_uri, + validation_time, + ) = load_manifest_fixture(); + let previous_vcir = sample_vcir_for_manifest_replay_meta( + &manifest, + &manifest_rsync_uri, + &publication_point_rsync_uri, + &manifest_bytes, + validation_time, + ); + store.put_vcir(&previous_vcir).expect("put previous vcir"); + put_complete_publication_point_current_objects( + &store, + &manifest, + &manifest_rsync_uri, + manifest_bytes, + &publication_point_rsync_uri, + ); + + let (_fresh, timing) = try_build_fresh_publication_point_with_timing( + &store, + &manifest_rsync_uri, + &publication_point_rsync_uri, + None, + &issuer_ca_fixture_der(), + Some(issuer_ca_rsync_uri()), + validation_time, + ) + .expect("fresh publication point with matching replay meta"); + + assert!(timing.replay_meta_hit); + assert!(!timing.replay_meta_miss); + } + #[test] fn validate_manifest_embedded_ee_cert_path_rejects_missing_crl_files() { let (manifest, _, _, publication_point_rsync_uri, validation_time) = diff --git a/src/validation/objects.rs b/src/validation/objects.rs index a47b21d..3d9aaa1 100644 --- a/src/validation/objects.rs +++ b/src/validation/objects.rs @@ -19,7 +19,7 @@ use crate::validation::cert_path::{CertPathError, validate_signed_object_ee_cert use crate::validation::manifest::PublicationPointData; use crate::validation::publication_point::PublicationPointSnapshot; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, Instant}; use x509_parser::prelude::FromDer; use x509_parser::x509::SubjectPublicKeyInfo; @@ -118,6 +118,9 @@ pub(crate) struct RoaTaskOk { pub(crate) struct RoaTaskResult { pub(crate) publication_point_id: u64, pub(crate) index: usize, + pub(crate) worker_index: usize, + pub(crate) queue_wait_ms: u64, + pub(crate) worker_ms: u64, pub(crate) rsync_uri: String, pub(crate) sha256_hex: String, pub(crate) outcome: Result, @@ -736,14 +739,15 @@ pub(crate) struct OwnedRoaTask { issuer_effective_as: Option, validation_time: time::OffsetDateTime, collect_vcir_local_outputs: bool, + pub(crate) submitted_at: Option, } #[derive(Clone)] struct RoaTaskExecutor; impl ObjectTaskExecutor for RoaTaskExecutor { - fn execute(&self, _worker_index: usize, task: OwnedRoaTask) -> RoaTaskResult { - validate_owned_roa_task(task) + fn execute(&self, worker_index: usize, task: OwnedRoaTask) -> RoaTaskResult { + validate_owned_roa_task(worker_index, task) } } @@ -787,7 +791,13 @@ impl ParallelRoaWorkerPool { } } -fn validate_owned_roa_task(task: OwnedRoaTask) -> RoaTaskResult { +fn validate_owned_roa_task(worker_index: usize, task: OwnedRoaTask) -> RoaTaskResult { + let worker_started = Instant::now(); + let queue_wait_ms = task + .submitted_at + .map(|submitted_at| worker_started.saturating_duration_since(submitted_at)) + .map(|duration| duration.as_millis() as u64) + .unwrap_or(0); let sha256_hex = sha256_hex_from_32(&task.file.sha256); let issuer_spki = match SubjectPublicKeyInfo::from_der(task.issuer_spki_der.as_ref()) { Ok((rem, spki)) if rem.is_empty() => spki, @@ -795,6 +805,9 @@ fn validate_owned_roa_task(task: OwnedRoaTask) -> RoaTaskResult { return RoaTaskResult { publication_point_id: task.publication_point_id, index: task.index, + worker_index, + queue_wait_ms, + worker_ms: worker_started.elapsed().as_millis() as u64, rsync_uri: task.file.rsync_uri, sha256_hex, outcome: Err(ObjectValidateError::CertPath( @@ -806,6 +819,9 @@ fn validate_owned_roa_task(task: OwnedRoaTask) -> RoaTaskResult { return RoaTaskResult { publication_point_id: task.publication_point_id, index: task.index, + worker_index, + queue_wait_ms, + worker_ms: worker_started.elapsed().as_millis() as u64, rsync_uri: task.file.rsync_uri, sha256_hex, outcome: Err(ObjectValidateError::CertPath( @@ -837,6 +853,9 @@ fn validate_owned_roa_task(task: OwnedRoaTask) -> RoaTaskResult { RoaTaskResult { publication_point_id: task.publication_point_id, index: task.index, + worker_index, + queue_wait_ms, + worker_ms: worker_started.elapsed().as_millis() as u64, rsync_uri: task.file.rsync_uri, sha256_hex, outcome, @@ -888,6 +907,7 @@ impl ParallelObjectsStage { issuer_effective_as: self.issuer_effective_as.clone(), validation_time: self.validation_time, collect_vcir_local_outputs: self.collect_vcir_local_outputs, + submitted_at: None, }) .collect() } @@ -895,6 +915,14 @@ impl ParallelObjectsStage { pub(crate) fn roa_task_count(&self) -> usize { self.stats.roa_total } + + pub(crate) fn aspa_task_count(&self) -> usize { + self.stats.aspa_total + } + + pub(crate) fn locked_file_count(&self) -> usize { + self.locked_files.len() + } } pub(crate) fn prepare_publication_point_for_parallel_roa( @@ -1408,6 +1436,9 @@ pub(crate) fn validate_roa_task_serial( RoaTaskResult { publication_point_id: 0, index: task.index, + worker_index: 0, + queue_wait_ms: 0, + worker_ms: 0, rsync_uri: task.file.rsync_uri.clone(), sha256_hex, outcome, diff --git a/src/validation/tree_parallel.rs b/src/validation/tree_parallel.rs index 0439840..58089ea 100644 --- a/src/validation/tree_parallel.rs +++ b/src/validation/tree_parallel.rs @@ -41,6 +41,15 @@ struct InflightPublicationPoint { started_at: Instant, objects_started_at: Instant, task_count: usize, + tasks_submitted: usize, + first_task_submitted_at: Option, + last_task_submitted_at: Option, + first_result_at: Option, + last_result_at: Option, + worker_ms_total: u64, + worker_ms_max: u64, + queue_wait_ms_total: u64, + queue_wait_ms_max: u64, results: Vec, } @@ -49,6 +58,220 @@ struct FinishedPublicationPoint { result: Result, } +#[derive(Default)] +struct ReadyStageMetrics { + manifest_rsync_uri: Option, + publication_point_rsync_uri: Option, + ready_count: usize, + fallback_count: usize, + complete_count: usize, + staged_count: usize, + zero_task_count: usize, + error_count: usize, + discovered_children: usize, + locked_files: usize, + roa_tasks: usize, + aspa_objects: usize, + stage_fresh_ms: u64, + snapshot_prepare_ms: u64, + snapshot_current_index_lock_ms: u64, + snapshot_manifest_load_ms: u64, + snapshot_manifest_index_lookup_ms: u64, + snapshot_manifest_blob_load_ms: u64, + snapshot_manifest_decode_ms: u64, + snapshot_replay_guard_ms: u64, + replay_meta_hit_count: usize, + replay_meta_miss_count: usize, + snapshot_manifest_entries_ms: u64, + snapshot_pack_files_ms: u64, + snapshot_pack_files_index_lookup_ms: u64, + snapshot_pack_files_blob_load_ms: u64, + snapshot_ee_path_validate_ms: u64, + snapshot_manifest_file_count: usize, + child_discovery_ms: u64, + child_enqueue_ms: u64, + prepare_ms: u64, + build_roa_tasks_ms: u64, + total_ms: u64, +} + +#[derive(Default)] +struct ReadyStageBatchMetrics { + ready_count: usize, + fallback_count: usize, + complete_count: usize, + staged_count: usize, + zero_task_count: usize, + error_count: usize, + discovered_children: usize, + locked_files: usize, + roa_tasks: usize, + aspa_objects: usize, + stage_fresh_ms_total: u64, + stage_fresh_ms_max: u64, + stage_fresh_ms_max_manifest_rsync_uri: Option, + stage_fresh_ms_max_publication_point_rsync_uri: Option, + snapshot_prepare_ms_total: u64, + snapshot_prepare_ms_max: u64, + snapshot_current_index_lock_ms_total: u64, + snapshot_current_index_lock_ms_max: u64, + snapshot_manifest_load_ms_total: u64, + snapshot_manifest_load_ms_max: u64, + snapshot_manifest_index_lookup_ms_total: u64, + snapshot_manifest_index_lookup_ms_max: u64, + snapshot_manifest_blob_load_ms_total: u64, + snapshot_manifest_blob_load_ms_max: u64, + snapshot_manifest_decode_ms_total: u64, + snapshot_manifest_decode_ms_max: u64, + snapshot_replay_guard_ms_total: u64, + snapshot_replay_guard_ms_max: u64, + replay_meta_hit_count: usize, + replay_meta_miss_count: usize, + snapshot_manifest_entries_ms_total: u64, + snapshot_manifest_entries_ms_max: u64, + snapshot_pack_files_ms_total: u64, + snapshot_pack_files_ms_max: u64, + snapshot_pack_files_index_lookup_ms_total: u64, + snapshot_pack_files_index_lookup_ms_max: u64, + snapshot_pack_files_blob_load_ms_total: u64, + snapshot_pack_files_blob_load_ms_max: u64, + snapshot_ee_path_validate_ms_total: u64, + snapshot_ee_path_validate_ms_max: u64, + snapshot_manifest_file_count_total: usize, + snapshot_manifest_file_count_max: usize, + child_discovery_ms_total: u64, + child_discovery_ms_max: u64, + child_enqueue_ms_total: u64, + child_enqueue_ms_max: u64, + prepare_ms_total: u64, + prepare_ms_max: u64, + build_roa_tasks_ms_total: u64, + build_roa_tasks_ms_max: u64, + total_ms: u64, +} + +impl ReadyStageBatchMetrics { + fn record(&mut self, metrics: ReadyStageMetrics) { + self.ready_count += metrics.ready_count; + self.fallback_count += metrics.fallback_count; + self.complete_count += metrics.complete_count; + self.staged_count += metrics.staged_count; + self.zero_task_count += metrics.zero_task_count; + self.error_count += metrics.error_count; + self.discovered_children += metrics.discovered_children; + self.locked_files += metrics.locked_files; + self.roa_tasks += metrics.roa_tasks; + self.aspa_objects += metrics.aspa_objects; + if metrics.stage_fresh_ms >= self.stage_fresh_ms_max { + self.stage_fresh_ms_max_manifest_rsync_uri = metrics.manifest_rsync_uri.clone(); + self.stage_fresh_ms_max_publication_point_rsync_uri = + metrics.publication_point_rsync_uri.clone(); + } + self.stage_fresh_ms_total += metrics.stage_fresh_ms; + self.stage_fresh_ms_max = self.stage_fresh_ms_max.max(metrics.stage_fresh_ms); + self.snapshot_prepare_ms_total += metrics.snapshot_prepare_ms; + self.snapshot_prepare_ms_max = self + .snapshot_prepare_ms_max + .max(metrics.snapshot_prepare_ms); + self.snapshot_current_index_lock_ms_total += metrics.snapshot_current_index_lock_ms; + self.snapshot_current_index_lock_ms_max = self + .snapshot_current_index_lock_ms_max + .max(metrics.snapshot_current_index_lock_ms); + self.snapshot_manifest_load_ms_total += metrics.snapshot_manifest_load_ms; + self.snapshot_manifest_load_ms_max = self + .snapshot_manifest_load_ms_max + .max(metrics.snapshot_manifest_load_ms); + self.snapshot_manifest_index_lookup_ms_total += metrics.snapshot_manifest_index_lookup_ms; + self.snapshot_manifest_index_lookup_ms_max = self + .snapshot_manifest_index_lookup_ms_max + .max(metrics.snapshot_manifest_index_lookup_ms); + self.snapshot_manifest_blob_load_ms_total += metrics.snapshot_manifest_blob_load_ms; + self.snapshot_manifest_blob_load_ms_max = self + .snapshot_manifest_blob_load_ms_max + .max(metrics.snapshot_manifest_blob_load_ms); + self.snapshot_manifest_decode_ms_total += metrics.snapshot_manifest_decode_ms; + self.snapshot_manifest_decode_ms_max = self + .snapshot_manifest_decode_ms_max + .max(metrics.snapshot_manifest_decode_ms); + self.snapshot_replay_guard_ms_total += metrics.snapshot_replay_guard_ms; + self.snapshot_replay_guard_ms_max = self + .snapshot_replay_guard_ms_max + .max(metrics.snapshot_replay_guard_ms); + self.replay_meta_hit_count += metrics.replay_meta_hit_count; + self.replay_meta_miss_count += metrics.replay_meta_miss_count; + self.snapshot_manifest_entries_ms_total += metrics.snapshot_manifest_entries_ms; + self.snapshot_manifest_entries_ms_max = self + .snapshot_manifest_entries_ms_max + .max(metrics.snapshot_manifest_entries_ms); + self.snapshot_pack_files_ms_total += metrics.snapshot_pack_files_ms; + self.snapshot_pack_files_ms_max = self + .snapshot_pack_files_ms_max + .max(metrics.snapshot_pack_files_ms); + self.snapshot_pack_files_index_lookup_ms_total += + metrics.snapshot_pack_files_index_lookup_ms; + self.snapshot_pack_files_index_lookup_ms_max = self + .snapshot_pack_files_index_lookup_ms_max + .max(metrics.snapshot_pack_files_index_lookup_ms); + self.snapshot_pack_files_blob_load_ms_total += metrics.snapshot_pack_files_blob_load_ms; + self.snapshot_pack_files_blob_load_ms_max = self + .snapshot_pack_files_blob_load_ms_max + .max(metrics.snapshot_pack_files_blob_load_ms); + self.snapshot_ee_path_validate_ms_total += metrics.snapshot_ee_path_validate_ms; + self.snapshot_ee_path_validate_ms_max = self + .snapshot_ee_path_validate_ms_max + .max(metrics.snapshot_ee_path_validate_ms); + self.snapshot_manifest_file_count_total += metrics.snapshot_manifest_file_count; + self.snapshot_manifest_file_count_max = self + .snapshot_manifest_file_count_max + .max(metrics.snapshot_manifest_file_count); + self.child_discovery_ms_total += metrics.child_discovery_ms; + self.child_discovery_ms_max = self.child_discovery_ms_max.max(metrics.child_discovery_ms); + self.child_enqueue_ms_total += metrics.child_enqueue_ms; + self.child_enqueue_ms_max = self.child_enqueue_ms_max.max(metrics.child_enqueue_ms); + self.prepare_ms_total += metrics.prepare_ms; + self.prepare_ms_max = self.prepare_ms_max.max(metrics.prepare_ms); + self.build_roa_tasks_ms_total += metrics.build_roa_tasks_ms; + self.build_roa_tasks_ms_max = self.build_roa_tasks_ms_max.max(metrics.build_roa_tasks_ms); + self.total_ms += metrics.total_ms; + } +} + +#[derive(Default)] +struct RoaDispatchMetrics { + attempted: usize, + submitted: usize, + queue_full: bool, + pending_remaining: usize, + duration_ms: u64, +} + +#[derive(Default)] +struct ObjectDrainMetrics { + results_drained: usize, + publication_points_finalized: usize, + reduce_ms_total: u64, + reduce_ms_max: u64, + finalize_ms_total: u64, + finalize_ms_max: u64, + worker_ms_total: u64, + worker_ms_max: u64, + queue_wait_ms_total: u64, + queue_wait_ms_max: u64, + duration_ms: u64, +} + +#[derive(Default)] +struct RepoDrainMetrics { + event_count: usize, + completions: usize, + ready_enqueued: usize, + duration_ms: u64, +} + +fn elapsed_ms(started: Instant) -> u64 { + started.elapsed().as_millis() as u64 +} + fn compact_phase2_finished_result( mut result: PublicationPointRunResult, compact_audit: bool, @@ -109,6 +332,12 @@ pub fn run_tree_parallel_phase2_audit_multi_root( let mut pending_roa_dispatch: VecDeque = VecDeque::new(); let mut finished: Vec = Vec::new(); let mut instances_started = 0usize; + let ready_batch_size = runner + .parallel_phase2_config + .as_ref() + .map(|cfg| cfg.ready_batch_size) + .unwrap_or(256) + .max(1); loop { while can_start_more(instances_started, config) { @@ -149,8 +378,13 @@ pub fn run_tree_parallel_phase2_audit_multi_root( } } - while let Some(ready) = ready_queue.pop_front() { - stage_ready_publication_point( + let ready_batch_started = Instant::now(); + let mut ready_batch_metrics = ReadyStageBatchMetrics::default(); + while ready_batch_metrics.ready_count < ready_batch_size { + let Some(ready) = ready_queue.pop_front() else { + break; + }; + let metrics = stage_ready_publication_point( runner, &mut next_id, &mut ca_queue, @@ -160,15 +394,129 @@ pub fn run_tree_parallel_phase2_audit_multi_root( ready, config.compact_audit, ); + ready_batch_metrics.record(metrics); + } + if ready_batch_metrics.ready_count > 0 { + ready_batch_metrics.total_ms = elapsed_ms(ready_batch_started); + crate::progress_log::emit( + "phase2_ready_queue_batch", + serde_json::json!({ + "ready_count": ready_batch_metrics.ready_count, + "fallback_count": ready_batch_metrics.fallback_count, + "complete_count": ready_batch_metrics.complete_count, + "staged_count": ready_batch_metrics.staged_count, + "zero_task_count": ready_batch_metrics.zero_task_count, + "error_count": ready_batch_metrics.error_count, + "discovered_children": ready_batch_metrics.discovered_children, + "locked_files": ready_batch_metrics.locked_files, + "roa_tasks": ready_batch_metrics.roa_tasks, + "aspa_objects": ready_batch_metrics.aspa_objects, + "stage_fresh_ms_total": ready_batch_metrics.stage_fresh_ms_total, + "stage_fresh_ms_max": ready_batch_metrics.stage_fresh_ms_max, + "stage_fresh_ms_max_manifest_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_manifest_rsync_uri, + "stage_fresh_ms_max_publication_point_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_publication_point_rsync_uri, + "child_enqueue_ms_total": ready_batch_metrics.child_enqueue_ms_total, + "child_enqueue_ms_max": ready_batch_metrics.child_enqueue_ms_max, + "prepare_ms_total": ready_batch_metrics.prepare_ms_total, + "prepare_ms_max": ready_batch_metrics.prepare_ms_max, + "build_roa_tasks_ms_total": ready_batch_metrics.build_roa_tasks_ms_total, + "build_roa_tasks_ms_max": ready_batch_metrics.build_roa_tasks_ms_max, + "batch_duration_ms": ready_batch_metrics.total_ms, + "ready_batch_size": ready_batch_size, + "ready_queue_len_after_batch": ready_queue.len(), + "ready_queue_budget_exhausted": !ready_queue.is_empty(), + "ca_queue_len_after_batch": ca_queue.len(), + "pending_roa_dispatch_len_after_batch": pending_roa_dispatch.len(), + "inflight_publication_points_after_batch": inflight_publication_points.len(), + }), + ); + crate::progress_log::emit( + "phase2_ready_queue_stage_fresh_breakdown", + serde_json::json!({ + "ready_count": ready_batch_metrics.ready_count, + "stage_fresh_ms_total": ready_batch_metrics.stage_fresh_ms_total, + "stage_fresh_ms_max": ready_batch_metrics.stage_fresh_ms_max, + "stage_fresh_ms_max_manifest_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_manifest_rsync_uri, + "stage_fresh_ms_max_publication_point_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_publication_point_rsync_uri, + "snapshot_prepare_ms_total": ready_batch_metrics.snapshot_prepare_ms_total, + "snapshot_prepare_ms_max": ready_batch_metrics.snapshot_prepare_ms_max, + "snapshot_current_index_lock_ms_total": ready_batch_metrics.snapshot_current_index_lock_ms_total, + "snapshot_current_index_lock_ms_max": ready_batch_metrics.snapshot_current_index_lock_ms_max, + "snapshot_manifest_load_ms_total": ready_batch_metrics.snapshot_manifest_load_ms_total, + "snapshot_manifest_load_ms_max": ready_batch_metrics.snapshot_manifest_load_ms_max, + "snapshot_manifest_index_lookup_ms_total": ready_batch_metrics.snapshot_manifest_index_lookup_ms_total, + "snapshot_manifest_index_lookup_ms_max": ready_batch_metrics.snapshot_manifest_index_lookup_ms_max, + "snapshot_manifest_blob_load_ms_total": ready_batch_metrics.snapshot_manifest_blob_load_ms_total, + "snapshot_manifest_blob_load_ms_max": ready_batch_metrics.snapshot_manifest_blob_load_ms_max, + "snapshot_manifest_decode_ms_total": ready_batch_metrics.snapshot_manifest_decode_ms_total, + "snapshot_manifest_decode_ms_max": ready_batch_metrics.snapshot_manifest_decode_ms_max, + "snapshot_replay_guard_ms_total": ready_batch_metrics.snapshot_replay_guard_ms_total, + "snapshot_replay_guard_ms_max": ready_batch_metrics.snapshot_replay_guard_ms_max, + "replay_meta_hit_count": ready_batch_metrics.replay_meta_hit_count, + "replay_meta_miss_count": ready_batch_metrics.replay_meta_miss_count, + "snapshot_manifest_entries_ms_total": ready_batch_metrics.snapshot_manifest_entries_ms_total, + "snapshot_manifest_entries_ms_max": ready_batch_metrics.snapshot_manifest_entries_ms_max, + "snapshot_pack_files_ms_total": ready_batch_metrics.snapshot_pack_files_ms_total, + "snapshot_pack_files_ms_max": ready_batch_metrics.snapshot_pack_files_ms_max, + "snapshot_pack_files_index_lookup_ms_total": ready_batch_metrics.snapshot_pack_files_index_lookup_ms_total, + "snapshot_pack_files_index_lookup_ms_max": ready_batch_metrics.snapshot_pack_files_index_lookup_ms_max, + "snapshot_pack_files_blob_load_ms_total": ready_batch_metrics.snapshot_pack_files_blob_load_ms_total, + "snapshot_pack_files_blob_load_ms_max": ready_batch_metrics.snapshot_pack_files_blob_load_ms_max, + "snapshot_ee_path_validate_ms_total": ready_batch_metrics.snapshot_ee_path_validate_ms_total, + "snapshot_ee_path_validate_ms_max": ready_batch_metrics.snapshot_ee_path_validate_ms_max, + "snapshot_manifest_file_count_total": ready_batch_metrics.snapshot_manifest_file_count_total, + "snapshot_manifest_file_count_max": ready_batch_metrics.snapshot_manifest_file_count_max, + "child_discovery_ms_total": ready_batch_metrics.child_discovery_ms_total, + "child_discovery_ms_max": ready_batch_metrics.child_discovery_ms_max, + "batch_duration_ms": ready_batch_metrics.total_ms, + }), + ); } - flush_pending_roa_dispatch(runner, &mut pending_roa_dispatch)?; - drain_object_results( + let dispatch_metrics = flush_pending_roa_dispatch( + runner, + &mut pending_roa_dispatch, + &mut inflight_publication_points, + )?; + if dispatch_metrics.attempted > 0 || dispatch_metrics.queue_full { + crate::progress_log::emit( + "phase2_roa_dispatch_batch", + serde_json::json!({ + "attempted": dispatch_metrics.attempted, + "submitted": dispatch_metrics.submitted, + "queue_full": dispatch_metrics.queue_full, + "pending_remaining": dispatch_metrics.pending_remaining, + "duration_ms": dispatch_metrics.duration_ms, + "inflight_publication_points": inflight_publication_points.len(), + }), + ); + } + let drain_metrics = drain_object_results( runner, &mut inflight_publication_points, &mut finished, config.compact_audit, )?; + if drain_metrics.results_drained > 0 { + crate::progress_log::emit( + "phase2_object_results_drain", + serde_json::json!({ + "results_drained": drain_metrics.results_drained, + "publication_points_finalized": drain_metrics.publication_points_finalized, + "reduce_ms_total": drain_metrics.reduce_ms_total, + "reduce_ms_max": drain_metrics.reduce_ms_max, + "finalize_ms_total": drain_metrics.finalize_ms_total, + "finalize_ms_max": drain_metrics.finalize_ms_max, + "worker_ms_total": drain_metrics.worker_ms_total, + "worker_ms_max": drain_metrics.worker_ms_max, + "queue_wait_ms_total": drain_metrics.queue_wait_ms_total, + "queue_wait_ms_max": drain_metrics.queue_wait_ms_max, + "duration_ms": drain_metrics.duration_ms, + "pending_roa_dispatch_len": pending_roa_dispatch.len(), + "inflight_publication_points": inflight_publication_points.len(), + }), + ); + } let repo_poll_timeout = event_poll_timeout( &ca_queue, &ready_queue, @@ -177,12 +525,25 @@ pub fn run_tree_parallel_phase2_audit_multi_root( instances_started, config, ); - drain_repo_events( + let repo_metrics = drain_repo_events( repo_runtime.as_ref(), &mut ca_waiting_repo_by_identity, &mut ready_queue, repo_poll_timeout, )?; + if repo_metrics.event_count > 0 { + crate::progress_log::emit( + "phase2_repo_events_drain", + serde_json::json!({ + "event_count": repo_metrics.event_count, + "completions": repo_metrics.completions, + "ready_enqueued": repo_metrics.ready_enqueued, + "duration_ms": repo_metrics.duration_ms, + "ready_queue_len": ready_queue.len(), + "ca_waiting_repo_identities": ca_waiting_repo_by_identity.len(), + }), + ); + } if is_complete( &ca_queue, @@ -219,21 +580,48 @@ fn stage_ready_publication_point( finished: &mut Vec, ready: ReadyCaInstance, compact_audit: bool, -) { +) -> ReadyStageMetrics { let publication_point_started = Instant::now(); + let mut metrics = ReadyStageMetrics { + ready_count: 1, + manifest_rsync_uri: Some(ready.node.handle.manifest_rsync_uri.clone()), + publication_point_rsync_uri: Some(ready.node.handle.publication_point_rsync_uri.clone()), + ..ReadyStageMetrics::default() + }; let mut warnings = ready.repo_outcome.warnings.clone(); let repo_outcome = ready.repo_outcome.clone(); + let stage_fresh_started = Instant::now(); let stage = runner.stage_fresh_publication_point_after_repo_ready( &ready.node.handle, repo_outcome.repo_sync_ok, repo_outcome.repo_sync_err.as_deref(), ); + metrics.stage_fresh_ms = elapsed_ms(stage_fresh_started); let fresh_stage = match stage { Ok(stage) => stage, - Err(_) => { + Err(err) => { + if metrics.stage_fresh_ms >= crate::progress_log::stage_fresh_slow_threshold_ms() { + crate::progress_log::emit( + "phase2_stage_fresh_slow", + serde_json::json!({ + "manifest_rsync_uri": ready.node.handle.manifest_rsync_uri.as_str(), + "publication_point_rsync_uri": ready.node.handle.publication_point_rsync_uri.as_str(), + "status": "error", + "error": err.error.to_string(), + "stage_fresh_ms": metrics.stage_fresh_ms, + "snapshot_prepare_ms": err.snapshot_prepare_ms, + "repo_sync_source": repo_outcome.repo_sync_source.as_deref(), + "repo_sync_phase": repo_outcome.repo_sync_phase.as_deref(), + "repo_sync_duration_ms": repo_outcome.repo_sync_duration_ms, + }), + ); + } + metrics.fallback_count = 1; let fallback = runner.run_publication_point(&ready.node.handle); if let Ok(result) = fallback.as_ref() { + metrics.discovered_children = result.discovered_children.len(); + let child_enqueue_started = Instant::now(); enqueue_discovered_children( runner, next_id, @@ -241,16 +629,73 @@ fn stage_ready_publication_point( &ready.node, result.discovered_children.clone(), ); + metrics.child_enqueue_ms = elapsed_ms(child_enqueue_started); } finished.push(FinishedPublicationPoint { node: ready.node, result: compact_phase2_finished_result_result(fallback, compact_audit), }); - return; + metrics.total_ms = elapsed_ms(publication_point_started); + return metrics; } }; + metrics.snapshot_prepare_ms = fresh_stage.snapshot_prepare_ms; + metrics.snapshot_current_index_lock_ms = + fresh_stage.snapshot_prepare_timing.current_index_lock_ms; + metrics.snapshot_manifest_load_ms = fresh_stage.snapshot_prepare_timing.manifest_load_ms; + metrics.snapshot_manifest_index_lookup_ms = + fresh_stage.snapshot_prepare_timing.manifest_index_lookup_ms; + metrics.snapshot_manifest_blob_load_ms = + fresh_stage.snapshot_prepare_timing.manifest_blob_load_ms; + metrics.snapshot_manifest_decode_ms = fresh_stage.snapshot_prepare_timing.manifest_decode_ms; + metrics.snapshot_replay_guard_ms = fresh_stage.snapshot_prepare_timing.replay_guard_ms; + metrics.replay_meta_hit_count = fresh_stage.snapshot_prepare_timing.replay_meta_hit as usize; + metrics.replay_meta_miss_count = fresh_stage.snapshot_prepare_timing.replay_meta_miss as usize; + metrics.snapshot_manifest_entries_ms = fresh_stage.snapshot_prepare_timing.manifest_entries_ms; + metrics.snapshot_pack_files_ms = fresh_stage.snapshot_prepare_timing.pack_files_ms; + metrics.snapshot_pack_files_index_lookup_ms = fresh_stage + .snapshot_prepare_timing + .pack_files_index_lookup_ms; + metrics.snapshot_pack_files_blob_load_ms = + fresh_stage.snapshot_prepare_timing.pack_files_blob_load_ms; + metrics.snapshot_ee_path_validate_ms = fresh_stage.snapshot_prepare_timing.ee_path_validate_ms; + metrics.snapshot_manifest_file_count = fresh_stage.snapshot_prepare_timing.manifest_file_count; + metrics.child_discovery_ms = fresh_stage.child_discovery_ms; + if metrics.stage_fresh_ms >= crate::progress_log::stage_fresh_slow_threshold_ms() { + crate::progress_log::emit( + "phase2_stage_fresh_slow", + serde_json::json!({ + "manifest_rsync_uri": ready.node.handle.manifest_rsync_uri.as_str(), + "publication_point_rsync_uri": ready.node.handle.publication_point_rsync_uri.as_str(), + "status": "ok", + "stage_fresh_ms": metrics.stage_fresh_ms, + "snapshot_prepare_ms": fresh_stage.snapshot_prepare_ms, + "snapshot_current_index_lock_ms": fresh_stage.snapshot_prepare_timing.current_index_lock_ms, + "snapshot_manifest_load_ms": fresh_stage.snapshot_prepare_timing.manifest_load_ms, + "snapshot_manifest_index_lookup_ms": fresh_stage.snapshot_prepare_timing.manifest_index_lookup_ms, + "snapshot_manifest_blob_load_ms": fresh_stage.snapshot_prepare_timing.manifest_blob_load_ms, + "snapshot_manifest_decode_ms": fresh_stage.snapshot_prepare_timing.manifest_decode_ms, + "snapshot_replay_guard_ms": fresh_stage.snapshot_prepare_timing.replay_guard_ms, + "replay_meta_hit": fresh_stage.snapshot_prepare_timing.replay_meta_hit, + "replay_meta_miss": fresh_stage.snapshot_prepare_timing.replay_meta_miss, + "snapshot_manifest_entries_ms": fresh_stage.snapshot_prepare_timing.manifest_entries_ms, + "snapshot_pack_files_ms": fresh_stage.snapshot_prepare_timing.pack_files_ms, + "snapshot_pack_files_index_lookup_ms": fresh_stage.snapshot_prepare_timing.pack_files_index_lookup_ms, + "snapshot_pack_files_blob_load_ms": fresh_stage.snapshot_prepare_timing.pack_files_blob_load_ms, + "snapshot_ee_path_validate_ms": fresh_stage.snapshot_prepare_timing.ee_path_validate_ms, + "snapshot_manifest_file_count": fresh_stage.snapshot_prepare_timing.manifest_file_count, + "child_discovery_ms": fresh_stage.child_discovery_ms, + "child_count": fresh_stage.discovered_children.len(), + "repo_sync_source": repo_outcome.repo_sync_source.as_deref(), + "repo_sync_phase": repo_outcome.repo_sync_phase.as_deref(), + "repo_sync_duration_ms": repo_outcome.repo_sync_duration_ms, + }), + ); + } warnings.extend(fresh_stage.warnings.clone()); + metrics.discovered_children = fresh_stage.discovered_children.len(); + let child_enqueue_started = Instant::now(); enqueue_discovered_children( runner, next_id, @@ -258,7 +703,9 @@ fn stage_ready_publication_point( &ready.node, fresh_stage.discovered_children.clone(), ); + metrics.child_enqueue_ms = elapsed_ms(child_enqueue_started); + let prepare_started = Instant::now(); match prepare_publication_point_for_parallel_roa( ready.node.id, &fresh_stage.fresh_point, @@ -270,6 +717,10 @@ fn stage_ready_publication_point( runner.persist_vcir, ) { ParallelObjectsPrepare::Complete(mut objects) => { + metrics.prepare_ms = elapsed_ms(prepare_started); + metrics.complete_count = 1; + metrics.roa_tasks = objects.stats.roa_total; + metrics.aspa_objects = objects.stats.aspa_total; objects .router_keys .extend(fresh_stage.discovered_router_keys.clone()); @@ -291,12 +742,20 @@ fn stage_ready_publication_point( ); } ParallelObjectsPrepare::Staged(objects_stage) => { + metrics.prepare_ms = elapsed_ms(prepare_started); + metrics.staged_count = 1; + metrics.locked_files = objects_stage.locked_file_count(); + metrics.aspa_objects = objects_stage.aspa_task_count(); + let build_tasks_started = Instant::now(); let tasks = objects_stage.build_roa_tasks(); + metrics.build_roa_tasks_ms = elapsed_ms(build_tasks_started); let task_count = objects_stage.roa_task_count(); + metrics.roa_tasks = task_count; for task in tasks { pending_roa_dispatch.push_back(task); } if task_count == 0 { + metrics.zero_task_count = 1; match reduce_parallel_roa_stage(objects_stage, Vec::new(), runner.timing.as_ref()) { Ok(mut objects) => { objects @@ -336,12 +795,23 @@ fn stage_ready_publication_point( started_at: publication_point_started, objects_started_at: Instant::now(), task_count, + tasks_submitted: 0, + first_task_submitted_at: None, + last_task_submitted_at: None, + first_result_at: None, + last_result_at: None, + worker_ms_total: 0, + worker_ms_max: 0, + queue_wait_ms_total: 0, + queue_wait_ms_max: 0, results: Vec::with_capacity(task_count), }, ); } } } + metrics.total_ms = elapsed_ms(publication_point_started); + metrics } fn enqueue_discovered_children( @@ -410,15 +880,32 @@ fn finalize_ready_objects( fn flush_pending_roa_dispatch( runner: &Rpkiv1PublicationPointRunner<'_>, pending_roa_dispatch: &mut VecDeque, -) -> Result<(), TreeRunError> { + inflight_publication_points: &mut HashMap, +) -> Result { + let started = Instant::now(); + let mut metrics = RoaDispatchMetrics::default(); let Some(pool) = runner.parallel_roa_worker_pool.as_ref() else { - return Ok(()); + return Ok(metrics); }; - while let Some(task) = pending_roa_dispatch.pop_front() { + while let Some(mut task) = pending_roa_dispatch.pop_front() { + metrics.attempted += 1; + let pp_id = task.publication_point_id; + task.submitted_at = Some(Instant::now()); match pool.try_submit_round_robin(task) { - Ok(_) => {} + Ok(_) => { + metrics.submitted += 1; + if let Some(state) = inflight_publication_points.get_mut(&pp_id) { + let now = Instant::now(); + state.tasks_submitted += 1; + if state.first_task_submitted_at.is_none() { + state.first_task_submitted_at = Some(now); + } + state.last_task_submitted_at = Some(now); + } + } Err(ObjectWorkerSubmitError::QueueFull { task, .. }) => { pending_roa_dispatch.push_front(task); + metrics.queue_full = true; break; } Err(ObjectWorkerSubmitError::Disconnected { .. }) => { @@ -428,7 +915,9 @@ fn flush_pending_roa_dispatch( } } } - Ok(()) + metrics.pending_remaining = pending_roa_dispatch.len(); + metrics.duration_ms = elapsed_ms(started); + Ok(metrics) } fn drain_object_results( @@ -436,9 +925,11 @@ fn drain_object_results( inflight_publication_points: &mut HashMap, finished: &mut Vec, compact_audit: bool, -) -> Result<(), TreeRunError> { +) -> Result { + let started = Instant::now(); + let mut metrics = ObjectDrainMetrics::default(); let Some(pool) = runner.parallel_roa_worker_pool.as_ref() else { - return Ok(()); + return Ok(metrics); }; loop { let Some(result) = pool @@ -447,8 +938,23 @@ fn drain_object_results( else { break; }; + metrics.results_drained += 1; let pp_id = result.publication_point_id; + let _worker_index = result.worker_index; + metrics.worker_ms_total += result.worker_ms; + metrics.worker_ms_max = metrics.worker_ms_max.max(result.worker_ms); + metrics.queue_wait_ms_total += result.queue_wait_ms; + metrics.queue_wait_ms_max = metrics.queue_wait_ms_max.max(result.queue_wait_ms); let should_finalize = if let Some(state) = inflight_publication_points.get_mut(&pp_id) { + let now = Instant::now(); + if state.first_result_at.is_none() { + state.first_result_at = Some(now); + } + state.last_result_at = Some(now); + state.worker_ms_total += result.worker_ms; + state.worker_ms_max = state.worker_ms_max.max(result.worker_ms); + state.queue_wait_ms_total += result.queue_wait_ms; + state.queue_wait_ms_max = state.queue_wait_ms_max.max(result.queue_wait_ms); state.results.push(result); state.results.len() == state.task_count } else { @@ -459,12 +965,19 @@ fn drain_object_results( .remove(&pp_id) .expect("inflight publication point must exist"); let objects_processing_ms = state.objects_started_at.elapsed().as_millis() as u64; - match reduce_parallel_roa_stage( + let reduce_started = Instant::now(); + let reduce_result = reduce_parallel_roa_stage( state.objects_stage, state.results, runner.timing.as_ref(), - ) { + ); + let reduce_ms = elapsed_ms(reduce_started); + metrics.reduce_ms_total += reduce_ms; + metrics.reduce_ms_max = metrics.reduce_ms_max.max(reduce_ms); + metrics.publication_points_finalized += 1; + match reduce_result { Ok(mut objects) => { + let finalize_started = Instant::now(); objects .router_keys .extend(state.fresh_stage.discovered_router_keys.clone()); @@ -488,12 +1001,37 @@ fn drain_object_results( state.repo_outcome.repo_sync_err.as_deref(), ) .map(|out| out.result); + let finalize_ms = elapsed_ms(finalize_started); + metrics.finalize_ms_total += finalize_ms; + metrics.finalize_ms_max = metrics.finalize_ms_max.max(finalize_ms); crate::progress_log::emit( "phase2_publication_point_reduced", serde_json::json!({ - "manifest_rsync_uri": state.node.handle.manifest_rsync_uri, - "publication_point_rsync_uri": state.node.handle.publication_point_rsync_uri, + "manifest_rsync_uri": state.node.handle.manifest_rsync_uri.as_str(), + "publication_point_rsync_uri": state.node.handle.publication_point_rsync_uri.as_str(), "objects_processing_ms": objects_processing_ms, + "task_count": state.task_count, + "tasks_submitted": state.tasks_submitted, + "first_task_submitted_ms": state.first_task_submitted_at.map(|t| t.saturating_duration_since(state.objects_started_at).as_millis() as u64), + "last_task_submitted_ms": state.last_task_submitted_at.map(|t| t.saturating_duration_since(state.objects_started_at).as_millis() as u64), + "task_submit_span_ms": match (state.first_task_submitted_at, state.last_task_submitted_at) { + (Some(first), Some(last)) => Some(last.saturating_duration_since(first).as_millis() as u64), + _ => None, + }, + "first_result_ms": state.first_result_at.map(|t| t.saturating_duration_since(state.objects_started_at).as_millis() as u64), + "last_result_ms": state.last_result_at.map(|t| t.saturating_duration_since(state.objects_started_at).as_millis() as u64), + "result_span_ms": match (state.first_result_at, state.last_result_at) { + (Some(first), Some(last)) => Some(last.saturating_duration_since(first).as_millis() as u64), + _ => None, + }, + "worker_ms_total": state.worker_ms_total, + "worker_ms_max": state.worker_ms_max, + "worker_ms_avg": if state.task_count > 0 { state.worker_ms_total / state.task_count as u64 } else { 0 }, + "queue_wait_ms_total": state.queue_wait_ms_total, + "queue_wait_ms_max": state.queue_wait_ms_max, + "queue_wait_ms_avg": if state.task_count > 0 { state.queue_wait_ms_total / state.task_count as u64 } else { 0 }, + "reduce_ms": reduce_ms, + "finalize_ms": finalize_ms, "total_duration_ms": state.started_at.elapsed().as_millis() as u64, }), ); @@ -509,7 +1047,8 @@ fn drain_object_results( } } } - Ok(()) + metrics.duration_ms = elapsed_ms(started); + Ok(metrics) } fn drain_repo_events( @@ -517,11 +1056,15 @@ fn drain_repo_events( ca_waiting_repo_by_identity: &mut HashMap>, ready_queue: &mut VecDeque, timeout: Duration, -) -> Result<(), TreeRunError> { +) -> Result { + let started = Instant::now(); + let mut metrics = RepoDrainMetrics::default(); if let Some(event) = repo_runtime .recv_repo_result_timeout(timeout) .map_err(TreeRunError::Runner)? { + metrics.event_count += 1; + metrics.completions += event.completions.len(); for completion in event.completions { let mut outcome = completion.outcome; if completion.identity != event.transport_identity { @@ -530,6 +1073,7 @@ fn drain_repo_events( outcome.repo_sync_duration_ms = 0; } if let Some(waiters) = ca_waiting_repo_by_identity.remove(&completion.identity) { + metrics.ready_enqueued += waiters.len(); for node in waiters { ready_queue.push_back(ReadyCaInstance { node, @@ -539,7 +1083,8 @@ fn drain_repo_events( } } } - Ok(()) + metrics.duration_ms = elapsed_ms(started); + Ok(metrics) } fn event_poll_timeout(