diff --git a/scripts/soak/portable-soak.env.example b/scripts/soak/portable-soak.env.example index e54f36a..ecfef11 100644 --- a/scripts/soak/portable-soak.env.example +++ b/scripts/soak/portable-soak.env.example @@ -44,11 +44,18 @@ RPKI_PROGRESS_LOG=1 # progress log 慢步骤阈值,单位秒。 RPKI_PROGRESS_SLOW_SECS=10 +# phase2 stage fresh 慢发布点阈值,单位毫秒。 +RPKI_PROGRESS_STAGE_FRESH_SLOW_MS=1000 + +# phase2 PP 控制面慢发布点阈值,单位毫秒。 +RPKI_PROGRESS_PP_CONTROL_SLOW_MS=100 + # 是否在运行前尝试禁用 rpki-client timer 并杀掉竞争 RP 进程。 DISABLE_COMPETING_RPS=1 # 传给 rpki 子进程的额外参数。多个参数用空格分隔。 # 示例:RPKI_EXTRA_ARGS="--enable-roa-validation-cache" +# 实验性 transport 预热:RPKI_EXTRA_ARGS="--enable-transport-request-prefetch --enable-roa-validation-cache" RPKI_EXTRA_ARGS="" # 是否为每轮输出 timing profile 到 runs/run_xxxx/analyze/timing.json。 diff --git a/scripts/soak/run_soak.sh b/scripts/soak/run_soak.sh index 658ce14..3e67589 100755 --- a/scripts/soak/run_soak.sh +++ b/scripts/soak/run_soak.sh @@ -22,6 +22,8 @@ FAILURE_SNAPSHOT_RESET="${FAILURE_SNAPSHOT_RESET:-1}" DB_STATS_EXACT_EVERY="${DB_STATS_EXACT_EVERY:-3}" RPKI_PROGRESS_LOG="${RPKI_PROGRESS_LOG:-1}" RPKI_PROGRESS_SLOW_SECS="${RPKI_PROGRESS_SLOW_SECS:-10}" +RPKI_PROGRESS_STAGE_FRESH_SLOW_MS="${RPKI_PROGRESS_STAGE_FRESH_SLOW_MS:-1000}" +RPKI_PROGRESS_PP_CONTROL_SLOW_MS="${RPKI_PROGRESS_PP_CONTROL_SLOW_MS:-100}" DISABLE_COMPETING_RPS="${DISABLE_COMPETING_RPS:-1}" RPKI_EXTRA_ARGS="${RPKI_EXTRA_ARGS:-}" RPKI_ANALYZE="${RPKI_ANALYZE:-0}" @@ -571,6 +573,8 @@ run_one_round() { env \ RPKI_PROGRESS_LOG="$RPKI_PROGRESS_LOG" \ RPKI_PROGRESS_SLOW_SECS="$RPKI_PROGRESS_SLOW_SECS" \ + RPKI_PROGRESS_STAGE_FRESH_SLOW_MS="$RPKI_PROGRESS_STAGE_FRESH_SLOW_MS" \ + RPKI_PROGRESS_PP_CONTROL_SLOW_MS="$RPKI_PROGRESS_PP_CONTROL_SLOW_MS" \ "$RPKI_DAEMON_BIN" "${daemon_args[@]}" -- "${CHILD_ARGS[@]}" \ > "$run_dir/daemon-stdout.log" 2> "$run_dir/daemon-stderr.log" daemon_exit_code=$? diff --git a/src/bin/db_stats.rs b/src/bin/db_stats.rs index 5330e1c..4be7ef8 100644 --- a/src/bin/db_stats.rs +++ b/src/bin/db_stats.rs @@ -5,7 +5,8 @@ use std::path::{Path, PathBuf}; use rocksdb::{DB, IteratorMode, Options}; use rpki::storage::{ ALL_COLUMN_FAMILY_NAMES, CF_MANIFEST_REPLAY_META, CF_RAW_BY_HASH, CF_REPOSITORY_VIEW, - CF_RRDP_SOURCE, CF_RRDP_SOURCE_MEMBER, CF_RRDP_URI_OWNER, CF_VCIR, column_family_descriptors, + CF_ROA_CACHE_PROJECTION, CF_RRDP_SOURCE, CF_RRDP_SOURCE_MEMBER, CF_RRDP_URI_OWNER, + CF_TRANSPORT_PREFETCH, CF_VCIR, column_family_descriptors, }; #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -98,7 +99,7 @@ Output: Output groups: - current_repository_view: repository_view + raw_by_hash - - current_validation_state: vcir + manifest_replay_meta + - current_validation_state: vcir + manifest_replay_meta + roa_cache_projection + transport_prefetch - current_rrdp_state: rrdp_source + rrdp_source_member + rrdp_uri_owner " ) @@ -182,7 +183,9 @@ fn collect_db_file_stats(db_path: &Path) -> Result CfGroup { match cf_name { CF_REPOSITORY_VIEW | CF_RAW_BY_HASH => CfGroup::CurrentRepositoryView, - CF_VCIR | CF_MANIFEST_REPLAY_META => CfGroup::CurrentValidationState, + CF_VCIR | CF_MANIFEST_REPLAY_META | CF_ROA_CACHE_PROJECTION | CF_TRANSPORT_PREFETCH => { + CfGroup::CurrentValidationState + } CF_RRDP_SOURCE | CF_RRDP_SOURCE_MEMBER | CF_RRDP_URI_OWNER => CfGroup::CurrentRrdpState, _ => CfGroup::LegacyCompatibility, } @@ -374,6 +377,14 @@ mod tests { cf_group(CF_MANIFEST_REPLAY_META), CfGroup::CurrentValidationState ); + assert_eq!( + cf_group(CF_ROA_CACHE_PROJECTION), + CfGroup::CurrentValidationState + ); + assert_eq!( + cf_group(CF_TRANSPORT_PREFETCH), + CfGroup::CurrentValidationState + ); assert_eq!(cf_group(CF_RRDP_SOURCE), CfGroup::CurrentRrdpState); assert_eq!(cf_group(CF_RRDP_URI_OWNER), CfGroup::CurrentRrdpState); assert_eq!(cf_group("unknown_legacy"), CfGroup::LegacyCompatibility); @@ -386,11 +397,12 @@ mod tests { (CF_RAW_BY_HASH, 7), (CF_VCIR, 11), (CF_MANIFEST_REPLAY_META, 13), + (CF_ROA_CACHE_PROJECTION, 17), (CF_RRDP_SOURCE_MEMBER, 19), ]); assert_eq!(grouped.get(&CfGroup::CurrentRepositoryView), Some(&12)); - assert_eq!(grouped.get(&CfGroup::CurrentValidationState), Some(&24)); + assert_eq!(grouped.get(&CfGroup::CurrentValidationState), Some(&41)); assert_eq!(grouped.get(&CfGroup::CurrentRrdpState), Some(&19)); assert_eq!(grouped.get(&CfGroup::LegacyCompatibility), None); } diff --git a/src/cli.rs b/src/cli.rs index e80afc1..f0df9a9 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -50,6 +50,7 @@ use std::sync::Arc; struct RunStageTiming { validation_ms: u64, enable_roa_validation_cache: bool, + enable_transport_request_prefetch: bool, report_build_ms: u64, report_write_ms: Option, ccr_build_ms: Option, @@ -123,6 +124,7 @@ pub struct CliArgs { pub skip_report_build: bool, pub skip_vcir_persist: bool, pub enable_roa_validation_cache: bool, + pub enable_transport_request_prefetch: bool, pub ccr_out_path: Option, pub vrps_csv_out_path: Option, pub vaps_csv_out_path: Option, @@ -179,6 +181,8 @@ Options: --skip-vcir-persist Skip VCIR persistence/projection building for compare-only runs --enable-roa-validation-cache Reuse accepted ROA validation outputs from previous VCIR records (default: off) + --enable-transport-request-prefetch + Experimental: prefetch previous run transport repo requests before tree traversal --ccr-out Write CCR DER ContentInfo to this path (optional) --vrps-csv-out Write VRP compare-view CSV directly from validation output (optional; requires --vaps-csv-out) --vaps-csv-out Write VAP compare-view CSV directly from validation output (optional; requires --vrps-csv-out) @@ -259,6 +263,7 @@ pub fn parse_args(argv: &[String]) -> Result { let mut skip_report_build: bool = false; let mut skip_vcir_persist: bool = false; let mut enable_roa_validation_cache: bool = false; + let mut enable_transport_request_prefetch: bool = false; let mut ccr_out_path: Option = None; let mut vrps_csv_out_path: Option = None; let mut vaps_csv_out_path: Option = None; @@ -464,6 +469,9 @@ pub fn parse_args(argv: &[String]) -> Result { "--enable-roa-validation-cache" => { enable_roa_validation_cache = true; } + "--enable-transport-request-prefetch" => { + enable_transport_request_prefetch = true; + } "--ccr-out" => { i += 1; let v = argv.get(i).ok_or("--ccr-out requires a value")?; @@ -894,6 +902,7 @@ pub fn parse_args(argv: &[String]) -> Result { skip_report_build, skip_vcir_persist, enable_roa_validation_cache, + enable_transport_request_prefetch, ccr_out_path, vrps_csv_out_path, vaps_csv_out_path, @@ -1954,6 +1963,7 @@ pub fn run(argv: &[String]) -> Result<(), String> { persist_vcir: !args.skip_vcir_persist, build_ccr_accumulator: args.ccr_out_path.is_some(), enable_roa_validation_cache: args.enable_roa_validation_cache, + enable_transport_request_prefetch: args.enable_transport_request_prefetch, }; let replay_mode = args.payload_replay_archive.is_some(); let delta_replay_mode = args.payload_base_archive.is_some(); @@ -2426,6 +2436,7 @@ pub fn run(argv: &[String]) -> Result<(), String> { let stage_timing = RunStageTiming { validation_ms, enable_roa_validation_cache: args.enable_roa_validation_cache, + enable_transport_request_prefetch: args.enable_transport_request_prefetch, report_build_ms, report_write_ms, ccr_build_ms, diff --git a/src/cli/tests.rs b/src/cli/tests.rs index 5a8a20e..3c67774 100644 --- a/src/cli/tests.rs +++ b/src/cli/tests.rs @@ -139,6 +139,22 @@ fn parse_accepts_enable_roa_validation_cache() { assert!(args.enable_roa_validation_cache); } +#[test] +fn parse_accepts_enable_transport_request_prefetch() { + let argv = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-path".to_string(), + "x.tal".to_string(), + "--ta-path".to_string(), + "x.cer".to_string(), + "--enable-transport-request-prefetch".to_string(), + ]; + let args = parse_args(&argv).expect("parse args"); + assert!(args.enable_transport_request_prefetch); +} + #[test] fn parse_disables_roa_validation_cache_by_default() { let argv = vec![ @@ -152,6 +168,7 @@ fn parse_disables_roa_validation_cache_by_default() { ]; let args = parse_args(&argv).expect("parse args"); assert!(!args.enable_roa_validation_cache); + assert!(!args.enable_transport_request_prefetch); } #[test] @@ -1535,6 +1552,7 @@ fn run_report_task_and_stage_timing_work() { let stage_timing = RunStageTiming { validation_ms: 1, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, report_build_ms: report_output.report_build_ms, report_write_ms: report_output.report_write_ms, ccr_build_ms: Some(2), @@ -1631,6 +1649,7 @@ fn stage_timing_serializes_memory_telemetry() { let stage_timing = RunStageTiming { validation_ms: 1, enable_roa_validation_cache: true, + enable_transport_request_prefetch: true, report_build_ms: 2, report_write_ms: None, ccr_build_ms: None, diff --git a/src/parallel/mod.rs b/src/parallel/mod.rs index 4fe1ab1..109319f 100644 --- a/src/parallel/mod.rs +++ b/src/parallel/mod.rs @@ -6,4 +6,5 @@ pub mod repo_scheduler; pub mod repo_worker; pub mod run_coordinator; pub mod stats; +pub mod transport_prefetch; pub mod types; diff --git a/src/parallel/repo_runtime.rs b/src/parallel/repo_runtime.rs index 9644add..e22f57b 100644 --- a/src/parallel/repo_runtime.rs +++ b/src/parallel/repo_runtime.rs @@ -4,6 +4,9 @@ use std::time::Duration; use crate::parallel::repo_scheduler::TransportRequestAction; use crate::parallel::repo_worker::{RepoTransportExecutor, RepoTransportWorkerPool}; use crate::parallel::run_coordinator::GlobalRunCoordinator; +use crate::parallel::transport_prefetch::{ + TransportPrefetchDispatchStats, TransportPrefetchRecorder, TransportPrefetchSnapshot, +}; use crate::parallel::types::{ RepoIdentity, RepoRequester, RepoRuntimeState, RepoTransportMode, RepoTransportResultEnvelope, RepoTransportResultKind, @@ -70,11 +73,20 @@ pub trait RepoSyncRuntime: Send + Sync { &self, children: &[DiscoveredChildCaInstance], ) -> Result<(), String>; + + fn prefetch_transport_requests( + &self, + snapshot: &TransportPrefetchSnapshot, + validation_time: time::OffsetDateTime, + ) -> Result; + + fn transport_prefetch_snapshot(&self) -> TransportPrefetchSnapshot; } pub struct Phase1RepoSyncRuntime { coordinator: Mutex, worker_pool: Mutex>, + transport_prefetch_recorder: Option>, rsync_scope_resolver: Arc String + Send + Sync>, rsync_failure_scope_resolver: Arc Option + Send + Sync>, sync_preference: SyncPreference, @@ -106,6 +118,26 @@ impl Phase1RepoSyncRuntime { Self { coordinator: Mutex::new(coordinator), worker_pool: Mutex::new(worker_pool), + transport_prefetch_recorder: None, + rsync_scope_resolver, + rsync_failure_scope_resolver, + sync_preference, + } + } + + pub fn new_with_failure_scope_and_prefetch_recording( + coordinator: GlobalRunCoordinator, + worker_pool: RepoTransportWorkerPool, + rsync_scope_resolver: Arc String + Send + Sync>, + rsync_failure_scope_resolver: Arc Option + Send + Sync>, + sync_preference: SyncPreference, + record_transport_prefetch_requests: bool, + ) -> Self { + Self { + coordinator: Mutex::new(coordinator), + worker_pool: Mutex::new(worker_pool), + transport_prefetch_recorder: record_transport_prefetch_requests + .then(|| Mutex::new(TransportPrefetchRecorder::default())), rsync_scope_resolver, rsync_failure_scope_resolver, sync_preference, @@ -136,6 +168,19 @@ impl Phase1RepoSyncRuntime { let requester = Self::build_requester(ca); let rsync_scope_uri = (self.rsync_scope_resolver)(&identity.rsync_base_uri); let rsync_failure_scope_uri = (self.rsync_failure_scope_resolver)(&identity.rsync_base_uri); + if let Some(recorder) = self.transport_prefetch_recorder.as_ref() { + let mut recorder = recorder + .lock() + .expect("transport prefetch recorder lock poisoned"); + recorder.record_registered_request( + &identity, + &requester, + priority, + rsync_scope_uri.clone(), + rsync_failure_scope_uri.clone(), + self.sync_preference, + ); + } let action = { let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned"); coordinator.register_transport_request( @@ -432,6 +477,83 @@ impl RepoSyncRuntime for Phase1RepoSyncRuntime { } Ok(()) } + + fn prefetch_transport_requests( + &self, + snapshot: &TransportPrefetchSnapshot, + validation_time: time::OffsetDateTime, + ) -> Result { + let mut stats = TransportPrefetchDispatchStats { + loaded_requests: snapshot.requests.len() as u64, + ..TransportPrefetchDispatchStats::default() + }; + + for request in &snapshot.requests { + let identity = request.to_identity(); + let current_rsync_scope_uri = (self.rsync_scope_resolver)(&identity.rsync_base_uri); + let current_rsync_failure_scope_uri = + (self.rsync_failure_scope_resolver)(&identity.rsync_base_uri); + if current_rsync_scope_uri != request.rsync_scope_uri + || current_rsync_failure_scope_uri != request.rsync_failure_scope_uri + { + stats.skipped_incompatible += 1; + continue; + } + let action = { + let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned"); + coordinator.register_transport_request( + identity, + request.to_requester(), + validation_time, + request.priority, + current_rsync_scope_uri, + current_rsync_failure_scope_uri, + self.sync_preference, + ) + }; + + match action { + TransportRequestAction::Enqueue(task) => { + stats.enqueued_tasks += 1; + crate::progress_log::emit( + "phase1_repo_prefetch_enqueued", + serde_json::json!({ + "repo_key_rsync_base_uri": task.repo_identity.rsync_base_uri, + "rsync_failure_scope_uri": task.rsync_failure_scope_uri, + "repo_key_notification_uri": task.repo_identity.notification_uri, + "priority": task.priority, + "transport_mode": match task.mode { + RepoTransportMode::Rrdp => "rrdp", + RepoTransportMode::Rsync => "rsync", + }, + }), + ); + } + TransportRequestAction::Waiting { .. } => { + stats.waiting_requests += 1; + } + TransportRequestAction::ReusedSuccess(_) + | TransportRequestAction::ReusedTerminalFailure(_) => { + stats.reused_results += 1; + } + } + } + + self.drain_pending_transport_tasks()?; + Ok(stats) + } + + fn transport_prefetch_snapshot(&self) -> TransportPrefetchSnapshot { + self.transport_prefetch_recorder + .as_ref() + .map(|recorder| { + recorder + .lock() + .expect("transport prefetch recorder lock poisoned") + .snapshot(self.sync_preference) + }) + .unwrap_or_else(|| TransportPrefetchSnapshot::new(self.sync_preference, Vec::new())) + } } fn outcome_from_transport_result( @@ -501,6 +623,7 @@ mod tests { RepoTransportExecutor, RepoTransportWorkerPool, RepoWorkerPoolConfig, }; use crate::parallel::run_coordinator::GlobalRunCoordinator; + use crate::parallel::transport_prefetch::TransportPrefetchSnapshot; use crate::parallel::types::{ RepoRuntimeState, RepoTransportMode, RepoTransportResultEnvelope, RepoTransportResultKind, RepoTransportTask, TalInputSpec, @@ -889,6 +1012,218 @@ mod tests { assert_eq!(count.load(Ordering::SeqCst), 2); } + #[test] + fn phase1_runtime_records_prefetch_snapshot_only_when_enabled() { + let coordinator = GlobalRunCoordinator::new( + ParallelPhase1Config::default(), + vec![TalInputSpec::from_url("https://example.test/arin.tal")], + ); + let pool = RepoTransportWorkerPool::new( + RepoWorkerPoolConfig { max_workers: 1 }, + SuccessTransportExecutor, + ) + .expect("pool"); + let runtime = Phase1RepoSyncRuntime::new_with_failure_scope_and_prefetch_recording( + coordinator, + pool, + Arc::new(|base: &str| base.to_string()), + Arc::new(|_base: &str| Some("rsync://example.test/".to_string())), + SyncPreference::RrdpThenRsync, + true, + ); + let ca = sample_ca("rsync://example.test/repo/root.mft"); + + let _ = runtime + .request_publication_point_repo(&ca, 0) + .expect("request repo"); + let snapshot = runtime.transport_prefetch_snapshot(); + assert_eq!(snapshot.requests.len(), 1); + assert_eq!( + snapshot.requests[0].repo_identity.rsync_base_uri, + "rsync://example.test/repo/" + ); + assert_eq!( + snapshot.requests[0].rsync_failure_scope_uri.as_deref(), + Some("rsync://example.test/") + ); + + let coordinator = GlobalRunCoordinator::new( + ParallelPhase1Config::default(), + vec![TalInputSpec::from_url("https://example.test/arin.tal")], + ); + let pool = RepoTransportWorkerPool::new( + RepoWorkerPoolConfig { max_workers: 1 }, + SuccessTransportExecutor, + ) + .expect("pool"); + let runtime = Phase1RepoSyncRuntime::new( + coordinator, + pool, + Arc::new(|base: &str| base.to_string()), + SyncPreference::RrdpThenRsync, + ); + let _ = runtime + .request_publication_point_repo(&ca, 0) + .expect("request repo without recording"); + assert!(runtime.transport_prefetch_snapshot().requests.is_empty()); + } + + #[test] + fn phase1_runtime_prefetch_dispatches_and_later_request_waits_on_same_task() { + let count = Arc::new(AtomicUsize::new(0)); + let coordinator = GlobalRunCoordinator::new( + ParallelPhase1Config::default(), + vec![TalInputSpec::from_url("https://example.test/arin.tal")], + ); + let pool = RepoTransportWorkerPool::new( + RepoWorkerPoolConfig { max_workers: 1 }, + CountingSuccessTransportExecutor { + count: Arc::clone(&count), + }, + ) + .expect("pool"); + let runtime = Phase1RepoSyncRuntime::new_with_failure_scope_and_prefetch_recording( + coordinator, + pool, + Arc::new(|base: &str| base.to_string()), + Arc::new(|_base: &str| Some("rsync://example.test/".to_string())), + SyncPreference::RrdpThenRsync, + true, + ); + let ca = sample_ca("rsync://example.test/repo/root.mft"); + let mut recorder = + crate::parallel::transport_prefetch::TransportPrefetchRecorder::default(); + recorder.record_registered_request( + &super::Phase1RepoSyncRuntime::::build_identity(&ca), + &super::Phase1RepoSyncRuntime::::build_requester(&ca), + 0, + "rsync://example.test/repo/".to_string(), + Some("rsync://example.test/".to_string()), + SyncPreference::RrdpThenRsync, + ); + let snapshot = recorder.snapshot(SyncPreference::RrdpThenRsync); + + let stats = runtime + .prefetch_transport_requests(&snapshot, time::OffsetDateTime::UNIX_EPOCH) + .expect("prefetch transport requests"); + assert_eq!(stats.loaded_requests, 1); + assert_eq!(stats.enqueued_tasks, 1); + + let status = runtime + .request_publication_point_repo(&ca, 0) + .expect("request after prefetch"); + assert!(matches!( + status, + super::RepoSyncRequestStatus::Pending { + state: RepoRuntimeState::WaitingRrdp, + .. + } + )); + let _ = runtime + .recv_repo_result_timeout(Duration::from_secs(1)) + .expect("repo event") + .expect("event"); + assert_eq!(count.load(Ordering::SeqCst), 1); + } + + #[test] + fn phase1_runtime_does_not_persist_prefetch_only_requests() { + let coordinator = GlobalRunCoordinator::new( + ParallelPhase1Config::default(), + vec![TalInputSpec::from_url("https://example.test/arin.tal")], + ); + let pool = RepoTransportWorkerPool::new( + RepoWorkerPoolConfig { max_workers: 1 }, + SuccessTransportExecutor, + ) + .expect("pool"); + let runtime = Phase1RepoSyncRuntime::new_with_failure_scope_and_prefetch_recording( + coordinator, + pool, + Arc::new(|base: &str| base.to_string()), + Arc::new(|_base: &str| Some("rsync://example.test/".to_string())), + SyncPreference::RrdpThenRsync, + true, + ); + let snapshot = TransportPrefetchSnapshot::new( + SyncPreference::RrdpThenRsync, + vec![crate::parallel::transport_prefetch::TransportPrefetchRequest::from_registered_request( + &crate::parallel::types::RepoIdentity::new( + Some("https://example.test/notify.xml".to_string()), + "rsync://example.test/repo/", + ), + &crate::parallel::types::RepoRequester::with_tal_rir( + "arin", + "arin", + "rsync://example.test/repo/root.mft", + "rsync://example.test/repo/", + "arin:root", + ), + 0, + "rsync://example.test/repo/".to_string(), + Some("rsync://example.test/".to_string()), + SyncPreference::RrdpThenRsync, + )], + ); + + let stats = runtime + .prefetch_transport_requests(&snapshot, time::OffsetDateTime::UNIX_EPOCH) + .expect("prefetch transport requests"); + assert_eq!(stats.enqueued_tasks, 1); + assert!( + runtime.transport_prefetch_snapshot().requests.is_empty(), + "prefetch-only requests should not be carried forward forever" + ); + } + + #[test] + fn phase1_runtime_prefetch_skips_requests_when_scope_resolver_differs() { + let coordinator = GlobalRunCoordinator::new( + ParallelPhase1Config::default(), + vec![TalInputSpec::from_url("https://example.test/arin.tal")], + ); + let pool = RepoTransportWorkerPool::new( + RepoWorkerPoolConfig { max_workers: 1 }, + SuccessTransportExecutor, + ) + .expect("pool"); + let runtime = Phase1RepoSyncRuntime::new_with_failure_scope_and_prefetch_recording( + coordinator, + pool, + Arc::new(|_base: &str| "rsync://example.test/different/".to_string()), + Arc::new(|_base: &str| Some("rsync://example.test/".to_string())), + SyncPreference::RrdpThenRsync, + true, + ); + let snapshot = TransportPrefetchSnapshot::new( + SyncPreference::RrdpThenRsync, + vec![crate::parallel::transport_prefetch::TransportPrefetchRequest::from_registered_request( + &crate::parallel::types::RepoIdentity::new( + Some("https://example.test/notify.xml".to_string()), + "rsync://example.test/repo/", + ), + &crate::parallel::types::RepoRequester::with_tal_rir( + "arin", + "arin", + "rsync://example.test/repo/root.mft", + "rsync://example.test/repo/", + "arin:root", + ), + 0, + "rsync://example.test/repo/".to_string(), + Some("rsync://example.test/".to_string()), + SyncPreference::RrdpThenRsync, + )], + ); + + let stats = runtime + .prefetch_transport_requests(&snapshot, time::OffsetDateTime::UNIX_EPOCH) + .expect("prefetch transport requests"); + assert_eq!(stats.loaded_requests, 1); + assert_eq!(stats.enqueued_tasks, 0); + assert_eq!(stats.skipped_incompatible, 1); + } + #[test] fn phase1_runtime_transitions_rrdp_failure_to_rsync_success() { let rrdp_count = Arc::new(AtomicUsize::new(0)); diff --git a/src/parallel/transport_prefetch.rs b/src/parallel/transport_prefetch.rs new file mode 100644 index 0000000..fa0ce0a --- /dev/null +++ b/src/parallel/transport_prefetch.rs @@ -0,0 +1,400 @@ +use std::collections::BTreeMap; + +use serde::{Deserialize, Serialize}; + +use crate::parallel::types::{ + RepoDedupKey, RepoIdentity, RepoRequester, RepoTransportMode, RepoTransportTask, +}; +use crate::policy::SyncPreference; + +pub const TRANSPORT_PREFETCH_SCHEMA_VERSION: u32 = 1; + +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct TransportPrefetchSnapshot { + pub schema_version: u32, + pub generated_at_unix: i64, + pub sync_preference: SyncPreference, + pub requests: Vec, +} + +impl TransportPrefetchSnapshot { + pub fn new(sync_preference: SyncPreference, requests: Vec) -> Self { + Self { + schema_version: TRANSPORT_PREFETCH_SCHEMA_VERSION, + generated_at_unix: time::OffsetDateTime::now_utc().unix_timestamp(), + sync_preference, + requests, + } + } + + pub fn is_compatible_with(&self, sync_preference: SyncPreference) -> bool { + self.schema_version == TRANSPORT_PREFETCH_SCHEMA_VERSION + && self.sync_preference == sync_preference + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TransportPrefetchRequest { + pub dedup_key: TransportPrefetchDedupKey, + pub rsync_scope_uri: String, + pub rsync_failure_scope_uri: Option, + pub repo_identity: TransportPrefetchRepoIdentity, + pub mode: TransportPrefetchMode, + pub tal_id: String, + pub rir_id: String, + pub priority: u8, + pub requesters: Vec, +} + +impl TransportPrefetchRequest { + pub fn from_registered_request( + identity: &RepoIdentity, + requester: &RepoRequester, + priority: u8, + rsync_scope_uri: String, + rsync_failure_scope_uri: Option, + sync_preference: SyncPreference, + ) -> Self { + let (dedup_key, mode) = if sync_preference == SyncPreference::RrdpThenRsync { + if let Some(notification_uri) = identity.notification_uri.clone() { + ( + TransportPrefetchDedupKey::RrdpNotify { notification_uri }, + TransportPrefetchMode::Rrdp, + ) + } else { + ( + TransportPrefetchDedupKey::RsyncScope { + rsync_scope_uri: rsync_scope_uri.clone(), + }, + TransportPrefetchMode::Rsync, + ) + } + } else { + ( + TransportPrefetchDedupKey::RsyncScope { + rsync_scope_uri: rsync_scope_uri.clone(), + }, + TransportPrefetchMode::Rsync, + ) + }; + + Self { + dedup_key, + rsync_scope_uri, + rsync_failure_scope_uri, + repo_identity: TransportPrefetchRepoIdentity::from_identity(identity), + mode, + tal_id: requester.tal_id.clone(), + rir_id: requester.rir_id.clone(), + priority, + requesters: vec![TransportPrefetchRequester::from_requester(requester)], + } + } + + pub fn from_task(task: &RepoTransportTask, rsync_scope_uri: String) -> Self { + Self { + dedup_key: TransportPrefetchDedupKey::from_repo_key(&task.dedup_key), + rsync_scope_uri, + rsync_failure_scope_uri: task.rsync_failure_scope_uri.clone(), + repo_identity: TransportPrefetchRepoIdentity::from_identity(&task.repo_identity), + mode: TransportPrefetchMode::from_mode(task.mode), + tal_id: task.tal_id.clone(), + rir_id: task.rir_id.clone(), + priority: task.priority, + requesters: task + .requesters + .iter() + .map(TransportPrefetchRequester::from_requester) + .collect(), + } + } + + pub fn to_identity(&self) -> RepoIdentity { + self.repo_identity.to_identity() + } + + pub fn to_requester(&self) -> RepoRequester { + self.requesters + .first() + .map(TransportPrefetchRequester::to_requester) + .unwrap_or_else(|| RepoRequester { + tal_id: self.tal_id.clone(), + rir_id: self.rir_id.clone(), + parent_node_id: None, + ca_instance_handle_id: format!( + "{}:{}", + self.tal_id, self.repo_identity.rsync_base_uri + ), + publication_point_rsync_uri: self.repo_identity.rsync_base_uri.clone(), + manifest_rsync_uri: format!("{}prefetch.mft", self.repo_identity.rsync_base_uri), + }) + } + + fn recorder_key(&self) -> String { + self.dedup_key.stable_key() + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum TransportPrefetchDedupKey { + RrdpNotify { notification_uri: String }, + RsyncScope { rsync_scope_uri: String }, +} + +impl TransportPrefetchDedupKey { + fn from_repo_key(key: &RepoDedupKey) -> Self { + match key { + RepoDedupKey::RrdpNotify { notification_uri } => Self::RrdpNotify { + notification_uri: notification_uri.clone(), + }, + RepoDedupKey::RsyncScope { rsync_scope_uri } => Self::RsyncScope { + rsync_scope_uri: rsync_scope_uri.clone(), + }, + } + } + + fn stable_key(&self) -> String { + match self { + Self::RrdpNotify { notification_uri } => format!("rrdp:{notification_uri}"), + Self::RsyncScope { rsync_scope_uri } => format!("rsync:{rsync_scope_uri}"), + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TransportPrefetchRepoIdentity { + pub notification_uri: Option, + pub rsync_base_uri: String, +} + +impl TransportPrefetchRepoIdentity { + fn from_identity(identity: &RepoIdentity) -> Self { + Self { + notification_uri: identity.notification_uri.clone(), + rsync_base_uri: identity.rsync_base_uri.clone(), + } + } + + fn to_identity(&self) -> RepoIdentity { + RepoIdentity::new(self.notification_uri.clone(), self.rsync_base_uri.clone()) + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum TransportPrefetchMode { + Rrdp, + Rsync, +} + +impl TransportPrefetchMode { + fn from_mode(mode: RepoTransportMode) -> Self { + match mode { + RepoTransportMode::Rrdp => Self::Rrdp, + RepoTransportMode::Rsync => Self::Rsync, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TransportPrefetchRequester { + pub tal_id: String, + pub rir_id: String, + pub parent_node_id: Option, + pub ca_instance_handle_id: String, + pub publication_point_rsync_uri: String, + pub manifest_rsync_uri: String, +} + +impl TransportPrefetchRequester { + fn from_requester(requester: &RepoRequester) -> Self { + Self { + tal_id: requester.tal_id.clone(), + rir_id: requester.rir_id.clone(), + parent_node_id: requester.parent_node_id, + ca_instance_handle_id: requester.ca_instance_handle_id.clone(), + publication_point_rsync_uri: requester.publication_point_rsync_uri.clone(), + manifest_rsync_uri: requester.manifest_rsync_uri.clone(), + } + } + + fn to_requester(&self) -> RepoRequester { + RepoRequester { + tal_id: self.tal_id.clone(), + rir_id: self.rir_id.clone(), + parent_node_id: self.parent_node_id, + ca_instance_handle_id: self.ca_instance_handle_id.clone(), + publication_point_rsync_uri: self.publication_point_rsync_uri.clone(), + manifest_rsync_uri: self.manifest_rsync_uri.clone(), + } + } +} + +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct TransportPrefetchRecorder { + requests_by_key: BTreeMap, + request_order: Vec, +} + +impl TransportPrefetchRecorder { + pub fn record_registered_request( + &mut self, + identity: &RepoIdentity, + requester: &RepoRequester, + priority: u8, + rsync_scope_uri: String, + rsync_failure_scope_uri: Option, + sync_preference: SyncPreference, + ) { + let request = TransportPrefetchRequest::from_registered_request( + identity, + requester, + priority, + rsync_scope_uri, + rsync_failure_scope_uri, + sync_preference, + ); + self.record_request(request); + } + + pub fn record_task(&mut self, task: &RepoTransportTask, rsync_scope_uri: String) { + let request = TransportPrefetchRequest::from_task(task, rsync_scope_uri); + self.record_request(request); + } + + pub fn snapshot(&self, sync_preference: SyncPreference) -> TransportPrefetchSnapshot { + TransportPrefetchSnapshot::new( + sync_preference, + self.request_order + .iter() + .filter_map(|key| self.requests_by_key.get(key)) + .cloned() + .collect(), + ) + } + + pub fn len(&self) -> usize { + self.requests_by_key.len() + } + + fn record_request(&mut self, request: TransportPrefetchRequest) { + let key = request.recorder_key(); + if !self.requests_by_key.contains_key(&key) { + self.request_order.push(key.clone()); + self.requests_by_key.insert(key, request); + } + } +} + +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct TransportPrefetchDispatchStats { + pub loaded_requests: u64, + pub enqueued_tasks: u64, + pub waiting_requests: u64, + pub reused_results: u64, + pub skipped_incompatible: u64, +} + +#[cfg(test)] +mod tests { + use super::*; + + fn requester(uri: &str) -> RepoRequester { + RepoRequester { + tal_id: "apnic".to_string(), + rir_id: "apnic".to_string(), + parent_node_id: None, + ca_instance_handle_id: format!("apnic:{uri}"), + publication_point_rsync_uri: "rsync://example.test/repo/".to_string(), + manifest_rsync_uri: uri.to_string(), + } + } + + fn task(notification_uri: &str, manifest_uri: &str) -> RepoTransportTask { + RepoTransportTask { + dedup_key: RepoDedupKey::RrdpNotify { + notification_uri: notification_uri.to_string(), + }, + rsync_failure_scope_uri: Some("rsync://example.test/".to_string()), + repo_identity: RepoIdentity::new( + Some(notification_uri.to_string()), + "rsync://example.test/repo/", + ), + mode: RepoTransportMode::Rrdp, + tal_id: "apnic".to_string(), + rir_id: "apnic".to_string(), + validation_time: time::OffsetDateTime::UNIX_EPOCH, + priority: 0, + requesters: vec![requester(manifest_uri)], + } + } + + #[test] + fn recorder_deduplicates_by_transport_key() { + let mut recorder = TransportPrefetchRecorder::default(); + recorder.record_task( + &task( + "https://example.test/notification.xml", + "rsync://example.test/repo/a.mft", + ), + "rsync://example.test/repo/".to_string(), + ); + recorder.record_task( + &task( + "https://example.test/notification.xml", + "rsync://example.test/repo/b.mft", + ), + "rsync://example.test/repo/".to_string(), + ); + + let snapshot = recorder.snapshot(SyncPreference::RrdpThenRsync); + assert_eq!(snapshot.requests.len(), 1); + assert_eq!( + snapshot.requests[0].requesters[0].manifest_rsync_uri, + "rsync://example.test/repo/a.mft" + ); + } + + #[test] + fn recorder_preserves_first_discovery_order() { + let mut recorder = TransportPrefetchRecorder::default(); + recorder.record_task( + &task( + "https://z.example.test/notification.xml", + "rsync://z.example.test/repo/root.mft", + ), + "rsync://z.example.test/repo/".to_string(), + ); + recorder.record_task( + &task( + "https://a.example.test/notification.xml", + "rsync://a.example.test/repo/root.mft", + ), + "rsync://a.example.test/repo/".to_string(), + ); + + let snapshot = recorder.snapshot(SyncPreference::RrdpThenRsync); + assert_eq!( + snapshot.requests[0] + .repo_identity + .notification_uri + .as_deref(), + Some("https://z.example.test/notification.xml") + ); + assert_eq!( + snapshot.requests[1] + .repo_identity + .notification_uri + .as_deref(), + Some("https://a.example.test/notification.xml") + ); + } + + #[test] + fn snapshot_checks_schema_and_sync_preference() { + let snapshot = TransportPrefetchSnapshot::new(SyncPreference::RrdpThenRsync, Vec::new()); + assert!(snapshot.is_compatible_with(SyncPreference::RrdpThenRsync)); + assert!(!snapshot.is_compatible_with(SyncPreference::RsyncOnly)); + } +} diff --git a/src/progress_log.rs b/src/progress_log.rs index da8a548..e8323b6 100644 --- a/src/progress_log.rs +++ b/src/progress_log.rs @@ -23,6 +23,13 @@ pub fn stage_fresh_slow_threshold_ms() -> u64 { .unwrap_or(1_000) } +pub fn pp_control_slow_threshold_ms() -> u64 { + std::env::var("RPKI_PROGRESS_PP_CONTROL_SLOW_MS") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(100) +} + pub fn emit(kind: &str, payload: Value) { if !progress_enabled() { return; diff --git a/src/storage.rs b/src/storage.rs index f1d7f07..8e95a69 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -2,7 +2,7 @@ mod config; mod keys; mod pack; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::path::Path; use base64::Engine; @@ -15,7 +15,8 @@ use crate::data_model::rc::{AsResourceSet, IpResourceSet}; use config::*; pub use config::{ ALL_COLUMN_FAMILY_NAMES, CF_MANIFEST_REPLAY_META, CF_RAW_BY_HASH, CF_REPOSITORY_VIEW, - CF_RRDP_SOURCE, CF_RRDP_SOURCE_MEMBER, CF_RRDP_URI_OWNER, CF_VCIR, column_family_descriptors, + CF_ROA_CACHE_PROJECTION, CF_RRDP_SOURCE, CF_RRDP_SOURCE_MEMBER, CF_RRDP_URI_OWNER, + CF_TRANSPORT_PREFETCH, CF_VCIR, column_family_descriptors, }; use keys::*; use pack::compute_sha256_32; @@ -865,6 +866,224 @@ fn validate_local_output_type_matches_payload(output: &VcirLocalOutput) -> Stora Ok(()) } +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RoaCacheCrlProjection { + #[serde(rename = "u")] + pub uri: String, + #[serde(rename = "h")] + pub sha256: String, +} + +impl RoaCacheCrlProjection { + pub fn validate_internal(&self) -> StorageResult<()> { + validate_non_empty("roa_cache_projection.crls[].uri", &self.uri)?; + validate_sha256_hex("roa_cache_projection.crls[].sha256", &self.sha256)?; + Ok(()) + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RoaCacheLocalOutputProjection { + #[serde(rename = "e")] + pub item_effective_until: PackTime, + #[serde(rename = "c")] + #[serde(with = "serde_bytes_32")] + pub source_ee_cert_hash: [u8; 32], + #[serde(rename = "p")] + pub payload: VcirLocalOutputPayload, + #[serde(rename = "r")] + #[serde(with = "serde_bytes_32")] + pub rule_hash: [u8; 32], +} + +impl RoaCacheLocalOutputProjection { + fn from_local_output(output: &VcirLocalOutput) -> Option { + if output.output_type != VcirOutputType::Vrp + || output.source_object_type != VcirSourceObjectType::Roa + { + return None; + } + Some(Self { + item_effective_until: output.item_effective_until.clone(), + source_ee_cert_hash: output.source_ee_cert_hash, + payload: output.payload.clone(), + rule_hash: output.rule_hash, + }) + } + + pub fn validate_internal(&self) -> StorageResult<()> { + parse_time( + "roa_cache_projection.entries[].outputs[].item_effective_until", + &self.item_effective_until, + )?; + if !matches!(self.payload, VcirLocalOutputPayload::Vrp { .. }) { + return Err(StorageError::InvalidData { + entity: "roa_cache_projection.entries[].outputs[]", + detail: "payload must be VRP".to_string(), + }); + } + Ok(()) + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RoaCacheObjectProjection { + #[serde(rename = "u")] + pub source_object_uri: String, + #[serde(rename = "h")] + #[serde(with = "serde_bytes_32")] + pub source_object_hash: [u8; 32], + #[serde(rename = "o")] + pub outputs: Vec, +} + +impl RoaCacheObjectProjection { + pub fn validate_internal(&self) -> StorageResult<()> { + validate_non_empty( + "roa_cache_projection.entries[].source_object_uri", + &self.source_object_uri, + )?; + if self.outputs.is_empty() { + return Err(StorageError::InvalidData { + entity: "roa_cache_projection.entries[]", + detail: "outputs must not be empty".to_string(), + }); + } + for output in &self.outputs { + output.validate_internal()?; + } + Ok(()) + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RoaCacheProjection { + #[serde(rename = "m")] + pub manifest_rsync_uri: String, + #[serde(rename = "e")] + pub instance_effective_until: PackTime, + #[serde(rename = "i")] + pub issuer_ca_sha256_hex: Option, + #[serde(rename = "c")] + pub crl_sha256_by_uri: Vec, + #[serde(rename = "r")] + pub entries: Vec, +} + +impl RoaCacheProjection { + pub fn from_vcir(vcir: &ValidatedCaInstanceResult) -> StorageResult> { + let mut issuer_ca_sha256_hex = None; + let mut crl_sha256_by_uri = Vec::new(); + for artifact in &vcir.related_artifacts { + if artifact.validation_status != VcirArtifactValidationStatus::Accepted { + continue; + } + match (artifact.artifact_role, artifact.artifact_kind) { + ( + VcirArtifactRole::IssuerCert | VcirArtifactRole::TrustAnchorCert, + VcirArtifactKind::Cer, + ) => { + issuer_ca_sha256_hex = Some(artifact.sha256.clone()); + } + (_, VcirArtifactKind::Crl) => { + if let Some(uri) = artifact.uri.as_ref() { + crl_sha256_by_uri.push(RoaCacheCrlProjection { + uri: uri.clone(), + sha256: artifact.sha256.clone(), + }); + } + } + _ => {} + } + } + crl_sha256_by_uri.sort_by(|left, right| left.uri.cmp(&right.uri)); + + let mut entries: Vec = Vec::new(); + let mut entry_index_by_uri: HashMap = HashMap::new(); + for output in &vcir.local_outputs { + let Some(projected_output) = RoaCacheLocalOutputProjection::from_local_output(output) + else { + continue; + }; + if let Some(entry_index) = entry_index_by_uri.get(output.source_object_uri.as_str()) { + let entry = &mut entries[*entry_index]; + if entry.source_object_hash != output.source_object_hash { + return Err(StorageError::InvalidData { + entity: "roa_cache_projection.entries[]", + detail: format!( + "source object hash mismatch for {}", + output.source_object_uri + ), + }); + } + entry.outputs.push(projected_output); + } else { + entry_index_by_uri.insert(output.source_object_uri.clone(), entries.len()); + entries.push(RoaCacheObjectProjection { + source_object_uri: output.source_object_uri.clone(), + source_object_hash: output.source_object_hash, + outputs: vec![projected_output], + }); + } + } + if entries.is_empty() { + return Ok(None); + } + entries.sort_by(|left, right| left.source_object_uri.cmp(&right.source_object_uri)); + + let projection = Self { + manifest_rsync_uri: vcir.manifest_rsync_uri.clone(), + instance_effective_until: vcir.instance_gate.instance_effective_until.clone(), + issuer_ca_sha256_hex, + crl_sha256_by_uri, + entries, + }; + projection.validate_internal()?; + Ok(Some(projection)) + } + + pub fn validate_internal(&self) -> StorageResult<()> { + validate_non_empty( + "roa_cache_projection.manifest_rsync_uri", + &self.manifest_rsync_uri, + )?; + parse_time( + "roa_cache_projection.instance_effective_until", + &self.instance_effective_until, + )?; + if let Some(hash) = &self.issuer_ca_sha256_hex { + validate_sha256_hex("roa_cache_projection.issuer_ca_sha256_hex", hash)?; + } + let mut seen_crls = HashSet::with_capacity(self.crl_sha256_by_uri.len()); + for crl in &self.crl_sha256_by_uri { + crl.validate_internal()?; + if !seen_crls.insert(crl.uri.as_str()) { + return Err(StorageError::InvalidData { + entity: "roa_cache_projection.crls[]", + detail: format!("duplicate CRL URI: {}", crl.uri), + }); + } + } + if self.entries.is_empty() { + return Err(StorageError::InvalidData { + entity: "roa_cache_projection.entries", + detail: "must not be empty".to_string(), + }); + } + let mut seen_entries = HashSet::with_capacity(self.entries.len()); + for entry in &self.entries { + entry.validate_internal()?; + if !seen_entries.insert(entry.source_object_uri.as_str()) { + return Err(StorageError::InvalidData { + entity: "roa_cache_projection.entries[]", + detail: format!("duplicate ROA URI: {}", entry.source_object_uri), + }); + } + } + Ok(()) + } +} + #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum VcirArtifactRole { @@ -1126,6 +1345,8 @@ pub struct VcirReplaceTimingBreakdown { pub vcir_value_bytes: u64, pub replay_meta_encode_ms: u64, pub replay_meta_value_bytes: u64, + pub roa_cache_projection_encode_ms: u64, + pub roa_cache_projection_value_bytes: u64, pub batch_build_ms: u64, pub write_batch_ms: u64, pub total_encoded_bytes: u64, @@ -1134,6 +1355,7 @@ pub struct VcirReplaceTimingBreakdown { pub rss_after_validate_kb: Option, pub rss_after_vcir_encode_kb: Option, pub rss_after_replay_meta_encode_kb: Option, + pub rss_after_roa_cache_projection_encode_kb: Option, pub rss_after_write_batch_kb: Option, } @@ -1583,6 +1805,29 @@ impl RrdpUriOwnerRecord { } } +fn write_roa_cache_projection_to_batch( + projection_cf: &ColumnFamily, + batch: &mut WriteBatch, + vcir: &ValidatedCaInstanceResult, + timing: Option<&mut VcirReplaceTimingBreakdown>, +) -> StorageResult<()> { + let projection_key = roa_cache_projection_key(&vcir.manifest_rsync_uri); + let projection = RoaCacheProjection::from_vcir(vcir)?; + match projection { + Some(projection) => { + let projection_value = encode_cbor(&projection, "roa_cache_projection")?; + if let Some(timing) = timing { + timing.roa_cache_projection_value_bytes = projection_value.len() as u64; + } + batch.put_cf(projection_cf, projection_key.as_bytes(), projection_value); + } + None => { + batch.delete_cf(projection_cf, projection_key.as_bytes()); + } + } + Ok(()) +} + impl RocksStore { pub fn open(path: &Path) -> StorageResult { let mut base_opts = Options::default(); @@ -1979,6 +2224,7 @@ impl RocksStore { vcir.validate_internal()?; let vcir_cf = self.cf(CF_VCIR)?; let replay_cf = self.cf(CF_MANIFEST_REPLAY_META)?; + let projection_cf = self.cf(CF_ROA_CACHE_PROJECTION)?; let replay_meta = ManifestReplayMeta::from_vcir(vcir); replay_meta.validate_internal()?; let mut batch = WriteBatch::default(); @@ -1988,6 +2234,7 @@ impl RocksStore { let replay_key = manifest_replay_meta_key(&replay_meta.manifest_rsync_uri); let replay_value = encode_cbor(&replay_meta, "manifest_replay_meta")?; batch.put_cf(replay_cf, replay_key.as_bytes(), replay_value); + write_roa_cache_projection_to_batch(projection_cf, &mut batch, vcir, None)?; self.write_batch(batch) } @@ -2009,6 +2256,7 @@ impl RocksStore { let batch_build_started = std::time::Instant::now(); let vcir_cf = self.cf(CF_VCIR)?; let replay_cf = self.cf(CF_MANIFEST_REPLAY_META)?; + let projection_cf = self.cf(CF_ROA_CACHE_PROJECTION)?; let mut batch = WriteBatch::default(); let vcir_key = vcir_key(&vcir.manifest_rsync_uri); @@ -2029,7 +2277,15 @@ impl RocksStore { batch.put_cf(replay_cf, replay_key.as_bytes(), replay_value); timing.rss_after_replay_meta_encode_kb = process_vm_rss_kb(); - timing.total_encoded_bytes = timing.vcir_value_bytes + timing.replay_meta_value_bytes; + let projection_encode_started = std::time::Instant::now(); + write_roa_cache_projection_to_batch(projection_cf, &mut batch, vcir, Some(&mut timing))?; + timing.roa_cache_projection_encode_ms = + projection_encode_started.elapsed().as_millis() as u64; + timing.rss_after_roa_cache_projection_encode_kb = process_vm_rss_kb(); + + timing.total_encoded_bytes = timing.vcir_value_bytes + + timing.replay_meta_value_bytes + + timing.roa_cache_projection_value_bytes; timing.batch_build_ms = batch_build_started.elapsed().as_millis() as u64; let write_batch_started = std::time::Instant::now(); @@ -2075,6 +2331,53 @@ impl RocksStore { Ok(Some(meta)) } + pub fn get_roa_cache_projection( + &self, + manifest_rsync_uri: &str, + ) -> StorageResult> { + let cf = self.cf(CF_ROA_CACHE_PROJECTION)?; + let key = roa_cache_projection_key(manifest_rsync_uri); + let Some(bytes) = self + .db + .get_cf(cf, key.as_bytes()) + .map_err(|e| StorageError::RocksDb(e.to_string()))? + else { + return Ok(None); + }; + let projection = decode_cbor::(&bytes, "roa_cache_projection")?; + projection.validate_internal()?; + Ok(Some(projection)) + } + + pub fn put_transport_prefetch_snapshot( + &self, + snapshot: &crate::parallel::transport_prefetch::TransportPrefetchSnapshot, + ) -> StorageResult<()> { + let cf = self.cf(CF_TRANSPORT_PREFETCH)?; + let value = encode_cbor(snapshot, "transport_prefetch_snapshot")?; + self.db + .put_cf(cf, TRANSPORT_PREFETCH_LAST_SNAPSHOT_KEY.as_bytes(), value) + .map_err(|e| StorageError::RocksDb(e.to_string())) + } + + pub fn get_transport_prefetch_snapshot( + &self, + ) -> StorageResult> { + let cf = self.cf(CF_TRANSPORT_PREFETCH)?; + let Some(bytes) = self + .db + .get_cf(cf, TRANSPORT_PREFETCH_LAST_SNAPSHOT_KEY.as_bytes()) + .map_err(|e| StorageError::RocksDb(e.to_string()))? + else { + return Ok(None); + }; + decode_cbor::( + &bytes, + "transport_prefetch_snapshot", + ) + .map(Some) + } + pub fn list_vcirs(&self) -> StorageResult> { let cf = self.cf(CF_VCIR)?; let mode = IteratorMode::Start; @@ -2130,11 +2433,14 @@ impl RocksStore { pub fn delete_vcir(&self, manifest_rsync_uri: &str) -> StorageResult<()> { let vcir_cf = self.cf(CF_VCIR)?; let replay_cf = self.cf(CF_MANIFEST_REPLAY_META)?; + let projection_cf = self.cf(CF_ROA_CACHE_PROJECTION)?; let mut batch = WriteBatch::default(); let key = vcir_key(manifest_rsync_uri); batch.delete_cf(vcir_cf, key.as_bytes()); let replay_key = manifest_replay_meta_key(manifest_rsync_uri); batch.delete_cf(replay_cf, replay_key.as_bytes()); + let projection_key = roa_cache_projection_key(manifest_rsync_uri); + batch.delete_cf(projection_cf, projection_key.as_bytes()); self.write_batch(batch) } diff --git a/src/storage/config.rs b/src/storage/config.rs index 80eafae..152d23e 100644 --- a/src/storage/config.rs +++ b/src/storage/config.rs @@ -5,9 +5,11 @@ pub const CF_RAW_BY_HASH: &str = "raw_by_hash"; pub const CF_RAW_BLOB: &str = "raw_blob"; pub const CF_VCIR: &str = "vcir"; pub const CF_MANIFEST_REPLAY_META: &str = "manifest_replay_meta"; +pub const CF_ROA_CACHE_PROJECTION: &str = "roa_cache_projection"; pub const CF_RRDP_SOURCE: &str = "rrdp_source"; pub const CF_RRDP_SOURCE_MEMBER: &str = "rrdp_source_member"; pub const CF_RRDP_URI_OWNER: &str = "rrdp_uri_owner"; +pub const CF_TRANSPORT_PREFETCH: &str = "transport_prefetch"; pub const ALL_COLUMN_FAMILY_NAMES: &[&str] = &[ CF_REPOSITORY_VIEW, @@ -15,9 +17,11 @@ pub const ALL_COLUMN_FAMILY_NAMES: &[&str] = &[ CF_RAW_BLOB, CF_VCIR, CF_MANIFEST_REPLAY_META, + CF_ROA_CACHE_PROJECTION, CF_RRDP_SOURCE, CF_RRDP_SOURCE_MEMBER, CF_RRDP_URI_OWNER, + CF_TRANSPORT_PREFETCH, ]; pub(super) const REPOSITORY_VIEW_KEY_PREFIX: &str = "repo_view:"; @@ -25,9 +29,11 @@ pub(super) const RAW_BY_HASH_KEY_PREFIX: &str = "rawbyhash:"; pub(super) const RAW_BLOB_KEY_PREFIX: &str = "rawblob:"; pub(super) const VCIR_KEY_PREFIX: &str = "vcir:"; pub(super) const MANIFEST_REPLAY_META_KEY_PREFIX: &str = "manifest_replay_meta:"; +pub(super) const ROA_CACHE_PROJECTION_KEY_PREFIX: &str = "roa_cache_projection:"; pub(super) const RRDP_SOURCE_KEY_PREFIX: &str = "rrdp_source:"; pub(super) const RRDP_SOURCE_MEMBER_KEY_PREFIX: &str = "rrdp_source_member:"; pub(super) const RRDP_URI_OWNER_KEY_PREFIX: &str = "rrdp_uri_owner:"; +pub(super) const TRANSPORT_PREFETCH_LAST_SNAPSHOT_KEY: &str = "transport_prefetch:last_snapshot"; const WORK_DB_BLOB_MODE_ENV: &str = "RPKI_WORK_DB_BLOB_MODE"; const WORK_DB_MEMORY_PROFILE_ENV: &str = "RPKI_WORK_DB_MEMORY_PROFILE"; diff --git a/src/storage/keys.rs b/src/storage/keys.rs index f1e5905..b17f1d0 100644 --- a/src/storage/keys.rs +++ b/src/storage/keys.rs @@ -30,6 +30,10 @@ pub(super) fn manifest_replay_meta_key(manifest_rsync_uri: &str) -> String { format!("{MANIFEST_REPLAY_META_KEY_PREFIX}{manifest_rsync_uri}") } +pub(super) fn roa_cache_projection_key(manifest_rsync_uri: &str) -> String { + format!("{ROA_CACHE_PROJECTION_KEY_PREFIX}{manifest_rsync_uri}") +} + pub(super) fn rrdp_source_key(notify_uri: &str) -> String { format!("{RRDP_SOURCE_KEY_PREFIX}{notify_uri}") } diff --git a/src/storage/tests.rs b/src/storage/tests.rs index d5f1eb1..140945b 100644 --- a/src/storage/tests.rs +++ b/src/storage/tests.rs @@ -298,6 +298,61 @@ fn vcir_ccr_manifest_projection_validate_rejects_invalid_fields() { )); } +#[test] +fn roa_cache_projection_from_vcir_keeps_only_roa_vrp_outputs() { + let vcir = sample_vcir("rsync://example.test/repo/current.mft"); + let projection = RoaCacheProjection::from_vcir(&vcir) + .expect("projection build") + .expect("projection exists"); + + assert_eq!(projection.manifest_rsync_uri, vcir.manifest_rsync_uri); + assert_eq!(projection.instance_effective_until, pack_time(12)); + assert_eq!(projection.crl_sha256_by_uri.len(), 1); + assert_eq!(projection.entries.len(), 1); + assert_eq!( + projection.entries[0].source_object_uri, + "rsync://example.test/repo/object.roa" + ); + assert_eq!(projection.entries[0].outputs.len(), 1); + assert!(matches!( + projection.entries[0].outputs[0].payload, + VcirLocalOutputPayload::Vrp { .. } + )); +} + +#[test] +fn roa_cache_projection_groups_multiple_outputs_by_roa_uri() { + let mut vcir = sample_vcir("rsync://example.test/repo/current.mft"); + let mut second = vcir.local_outputs[0].clone(); + second.rule_hash = sha256_32(b"vrp-rule-2"); + if let VcirLocalOutputPayload::Vrp { max_length, .. } = &mut second.payload { + *max_length = 25; + } + vcir.local_outputs.push(second); + vcir.summary.local_vrp_count = 2; + + let projection = RoaCacheProjection::from_vcir(&vcir) + .expect("projection build") + .expect("projection exists"); + + assert_eq!(projection.entries.len(), 1); + assert_eq!(projection.entries[0].outputs.len(), 2); +} + +#[test] +fn roa_cache_projection_rejects_duplicate_uri_with_different_hash() { + let mut vcir = sample_vcir("rsync://example.test/repo/current.mft"); + let mut duplicate = vcir.local_outputs[0].clone(); + duplicate.source_object_hash = sha256_32(b"different-roa"); + duplicate.rule_hash = sha256_32(b"vrp-rule-2"); + vcir.local_outputs.push(duplicate); + vcir.summary.local_vrp_count = 2; + + let err = RoaCacheProjection::from_vcir(&vcir) + .expect_err("same ROA URI with different hash must fail"); + assert!(err.to_string().contains("source object hash mismatch")); +} + fn sample_rrdp_source_record(notify_uri: &str) -> RrdpSourceRecord { RrdpSourceRecord { notify_uri: notify_uri.to_string(), @@ -777,6 +832,16 @@ fn vcir_roundtrip_and_validation_failures_are_reported() { updated_at_validation_time: vcir.last_successful_validation_time.clone(), } ); + let projection = store + .get_roa_cache_projection(&vcir.manifest_rsync_uri) + .expect("get roa cache projection") + .expect("roa cache projection exists"); + assert_eq!(projection.manifest_rsync_uri, vcir.manifest_rsync_uri); + assert_eq!(projection.entries.len(), 1); + assert_eq!( + projection.entries[0].source_object_uri, + "rsync://example.test/repo/object.roa" + ); let mut invalid = sample_vcir("rsync://example.test/repo/invalid.mft"); invalid.summary.local_vrp_count = 9; @@ -807,6 +872,65 @@ fn vcir_roundtrip_and_validation_failures_are_reported() { .expect("get deleted manifest replay meta") .is_none() ); + assert!( + store + .get_roa_cache_projection(&vcir.manifest_rsync_uri) + .expect("get deleted roa cache projection") + .is_none() + ); +} + +#[test] +fn transport_prefetch_snapshot_roundtrips() { + use crate::parallel::transport_prefetch::{ + TransportPrefetchDedupKey, TransportPrefetchMode, TransportPrefetchRepoIdentity, + TransportPrefetchRequest, TransportPrefetchRequester, TransportPrefetchSnapshot, + }; + use crate::policy::SyncPreference; + + let td = tempfile::tempdir().expect("tempdir"); + let store = RocksStore::open(td.path()).expect("open rocksdb"); + assert!( + store + .get_transport_prefetch_snapshot() + .expect("get empty prefetch snapshot") + .is_none() + ); + + let snapshot = TransportPrefetchSnapshot::new( + SyncPreference::RrdpThenRsync, + vec![TransportPrefetchRequest { + dedup_key: TransportPrefetchDedupKey::RrdpNotify { + notification_uri: "https://example.test/notification.xml".to_string(), + }, + rsync_scope_uri: "rsync://example.test/repo/".to_string(), + rsync_failure_scope_uri: Some("rsync://example.test/".to_string()), + repo_identity: TransportPrefetchRepoIdentity { + notification_uri: Some("https://example.test/notification.xml".to_string()), + rsync_base_uri: "rsync://example.test/repo/".to_string(), + }, + mode: TransportPrefetchMode::Rrdp, + tal_id: "apnic".to_string(), + rir_id: "apnic".to_string(), + priority: 0, + requesters: vec![TransportPrefetchRequester { + tal_id: "apnic".to_string(), + rir_id: "apnic".to_string(), + parent_node_id: None, + ca_instance_handle_id: "apnic:rsync://example.test/repo/root.mft".to_string(), + publication_point_rsync_uri: "rsync://example.test/repo/".to_string(), + manifest_rsync_uri: "rsync://example.test/repo/root.mft".to_string(), + }], + }], + ); + store + .put_transport_prefetch_snapshot(&snapshot) + .expect("put prefetch snapshot"); + let got = store + .get_transport_prefetch_snapshot() + .expect("get prefetch snapshot") + .expect("snapshot exists"); + assert_eq!(got, snapshot); } #[test] @@ -925,9 +1049,18 @@ fn replace_vcir_and_manifest_replay_meta_replaces_current_entry() { .expect("store previous vcir"); assert!(previous_timing.vcir_value_bytes > 0); assert!(previous_timing.replay_meta_value_bytes > 0); + assert!(previous_timing.roa_cache_projection_value_bytes > 0); assert_eq!( previous_timing.total_encoded_bytes, - previous_timing.vcir_value_bytes + previous_timing.replay_meta_value_bytes + previous_timing.vcir_value_bytes + + previous_timing.replay_meta_value_bytes + + previous_timing.roa_cache_projection_value_bytes + ); + assert!( + store + .get_roa_cache_projection(&previous.manifest_rsync_uri) + .expect("get previous projection") + .is_some() ); let mut current = sample_vcir("rsync://example.test/repo/current.mft"); @@ -951,6 +1084,7 @@ fn replace_vcir_and_manifest_replay_meta_replaces_current_entry() { .expect("replace vcir and replay meta"); assert!(current_timing.vcir_value_bytes > 0); assert!(current_timing.replay_meta_value_bytes > 0); + assert_eq!(current_timing.roa_cache_projection_value_bytes, 0); assert_eq!( current_timing.total_encoded_bytes, current_timing.vcir_value_bytes + current_timing.replay_meta_value_bytes @@ -973,6 +1107,12 @@ fn replace_vcir_and_manifest_replay_meta_replaces_current_entry() { replay_meta.manifest_sha256, current.ccr_manifest_projection.manifest_sha256 ); + assert!( + store + .get_roa_cache_projection(¤t.manifest_rsync_uri) + .expect("get current projection") + .is_none() + ); } #[test] diff --git a/src/validation/objects.rs b/src/validation/objects.rs index 4647524..befb951 100644 --- a/src/validation/objects.rs +++ b/src/validation/objects.rs @@ -15,9 +15,9 @@ use crate::parallel::object_worker::{ use crate::policy::{Policy, SignedObjectFailurePolicy}; use crate::report::{RfcRef, Warning}; use crate::storage::{ - PackFile, PackTime, ValidatedCaInstanceResult, VcirArtifactKind, VcirArtifactRole, - VcirArtifactValidationStatus, VcirLocalOutput, VcirLocalOutputPayload, VcirOutputType, - VcirSourceObjectType, + PackFile, PackTime, RoaCacheProjection, ValidatedCaInstanceResult, VcirArtifactKind, + VcirArtifactRole, VcirArtifactValidationStatus, VcirLocalOutput, VcirLocalOutputPayload, + VcirOutputType, VcirSourceObjectType, }; use crate::validation::cert_path::{CertPathError, validate_signed_object_ee_cert_path_fast}; use crate::validation::manifest::PublicationPointData; @@ -264,6 +264,65 @@ pub struct CachedRoaValidationResult { } impl RoaValidationCacheView { + pub fn from_projection( + projection: &RoaCacheProjection, + validation_time: time::OffsetDateTime, + ) -> Self { + let mut entries_by_uri: HashMap = + HashMap::with_capacity(projection.entries.len()); + let issuer_ca_sha256_hex = projection.issuer_ca_sha256_hex.clone(); + let crl_sha256_by_uri = projection + .crl_sha256_by_uri + .iter() + .map(|crl| (crl.uri.clone(), crl.sha256.clone())) + .collect::>(); + let blocked = projection + .instance_effective_until + .parse() + .map(|effective_until| effective_until <= validation_time) + .unwrap_or(true); + + if blocked { + return Self { + entries_by_uri, + issuer_ca_sha256_hex, + crl_sha256_by_uri, + blocked, + }; + } + + for entry in &projection.entries { + let outputs = entry + .outputs + .iter() + .map(|output| VcirLocalOutput { + output_type: VcirOutputType::Vrp, + item_effective_until: output.item_effective_until.clone(), + source_object_uri: entry.source_object_uri.clone(), + source_object_type: VcirSourceObjectType::Roa, + source_object_hash: entry.source_object_hash, + source_ee_cert_hash: output.source_ee_cert_hash, + payload: output.payload.clone(), + rule_hash: output.rule_hash, + }) + .collect::>(); + entries_by_uri.insert( + entry.source_object_uri.clone(), + CachedRoaValidationResult { + source_object_hash: entry.source_object_hash, + outputs, + }, + ); + } + + Self { + entries_by_uri, + issuer_ca_sha256_hex, + crl_sha256_by_uri, + blocked, + } + } + pub fn from_vcir( vcir: &ValidatedCaInstanceResult, validation_time: time::OffsetDateTime, @@ -3067,8 +3126,8 @@ mod tests { }; use crate::policy::Policy; use crate::storage::{ - PackTime, ValidatedManifestMeta, VcirAuditSummary, VcirCcrManifestProjection, - VcirInstanceGate, VcirRelatedArtifact, VcirSummary, + PackTime, RoaCacheProjection, ValidatedManifestMeta, VcirAuditSummary, + VcirCcrManifestProjection, VcirInstanceGate, VcirRelatedArtifact, VcirSummary, }; use crate::validation::publication_point::PublicationPointSnapshot; use std::collections::HashMap; @@ -3214,6 +3273,120 @@ mod tests { assert_eq!(ok.local_outputs.len(), 1); } + #[test] + fn roa_validation_cache_view_from_projection_matches_vcir_view() { + let validation_time = fixed_time("2026-06-05T00:00:00Z"); + let issuer_der = b"issuer-ca"; + let crl_hash = [0x22; 32]; + let roa_hash = [0x11; 32]; + let vcir = sample_roa_cache_vcir( + issuer_der, + crl_hash, + roa_hash, + fixed_time("2026-06-07T00:00:00Z"), + fixed_time("2026-06-08T00:00:00Z"), + ); + let projection = RoaCacheProjection::from_vcir(&vcir) + .expect("build projection") + .expect("projection exists"); + let view = RoaValidationCacheView::from_projection(&projection, validation_time); + let files = vec![ + PackFile::from_bytes_with_sha256( + "rsync://example.test/repo/a.roa", + vec![0x01], + roa_hash, + ), + PackFile::from_bytes_with_sha256( + "rsync://example.test/repo/current.crl", + vec![0x02], + crl_hash, + ), + ]; + + assert!(view.matches_current_context(issuer_der, &files)); + let hit = view.lookup(&files[0], validation_time); + let RoaCacheLookupResult::Hit(ok) = hit else { + panic!("expected projection cache hit, got {hit:?}"); + }; + assert!(ok.reused_from_cache); + assert_eq!(ok.vrps.len(), 1); + assert_eq!(ok.vrps[0].asn, 64500); + assert_eq!(ok.local_outputs.len(), 1); + assert_eq!( + ok.local_outputs[0].source_object_uri, + "rsync://example.test/repo/a.roa" + ); + } + + #[test] + fn roa_validation_cache_view_from_projection_blocks_on_gates() { + let validation_time = fixed_time("2026-06-05T00:00:00Z"); + let issuer_der = b"issuer-ca"; + let crl_hash = [0x22; 32]; + let roa_hash = [0x11; 32]; + let vcir = sample_roa_cache_vcir( + issuer_der, + crl_hash, + roa_hash, + fixed_time("2026-06-07T00:00:00Z"), + fixed_time("2026-06-08T00:00:00Z"), + ); + let mut projection = RoaCacheProjection::from_vcir(&vcir) + .expect("build projection") + .expect("projection exists"); + let files = vec![ + PackFile::from_bytes_with_sha256( + "rsync://example.test/repo/a.roa", + vec![0x01], + roa_hash, + ), + PackFile::from_bytes_with_sha256( + "rsync://example.test/repo/current.crl", + vec![0x02], + crl_hash, + ), + ]; + + projection.issuer_ca_sha256_hex = Some("00".repeat(32)); + let issuer_changed = RoaValidationCacheView::from_projection(&projection, validation_time); + assert!(!issuer_changed.matches_current_context(issuer_der, &files)); + + let mut projection = RoaCacheProjection::from_vcir(&vcir) + .expect("build projection") + .expect("projection exists"); + projection.crl_sha256_by_uri[0].sha256 = "11".repeat(32); + let crl_changed = RoaValidationCacheView::from_projection(&projection, validation_time); + assert!(!crl_changed.matches_current_context(issuer_der, &files)); + + let mut projection = RoaCacheProjection::from_vcir(&vcir) + .expect("build projection") + .expect("projection exists"); + projection.entries[0].source_object_hash = [0xff; 32]; + let roa_changed = RoaValidationCacheView::from_projection(&projection, validation_time); + assert!(matches!( + roa_changed.lookup(&files[0], validation_time), + RoaCacheLookupResult::Blocked + )); + + let expired_vcir = sample_roa_cache_vcir( + issuer_der, + crl_hash, + roa_hash, + fixed_time("2026-06-07T00:00:00Z"), + fixed_time("2026-06-04T00:00:00Z"), + ); + let expired_projection = RoaCacheProjection::from_vcir(&expired_vcir) + .expect("build projection") + .expect("projection exists"); + let expired_view = + RoaValidationCacheView::from_projection(&expired_projection, validation_time); + assert!(!expired_view.matches_current_context(issuer_der, &files)); + assert!(matches!( + expired_view.lookup(&files[0], validation_time), + RoaCacheLookupResult::Blocked + )); + } + #[test] fn roa_validation_cache_view_blocks_when_context_changes() { let validation_time = fixed_time("2026-06-05T00:00:00Z"); diff --git a/src/validation/run_tree_from_tal.rs b/src/validation/run_tree_from_tal.rs index 8c9e749..2d49cee 100644 --- a/src/validation/run_tree_from_tal.rs +++ b/src/validation/run_tree_from_tal.rs @@ -13,6 +13,7 @@ use crate::parallel::repo_worker::{ }; use crate::parallel::run_coordinator::GlobalRunCoordinator; use crate::parallel::types::{TalInputSpec, TalSource}; +use crate::policy::SyncPreference; use crate::replay::archive::ReplayArchiveIndex; use crate::replay::delta_archive::ReplayDeltaArchiveIndex; use crate::replay::delta_fetch_http::PayloadDeltaReplayHttpFetcher; @@ -171,6 +172,7 @@ fn build_phase1_repo_sync_runtime( timing: Option, download_log: Option, tal_inputs: Vec, + record_transport_prefetch_requests: bool, ) -> Result<(Arc, CurrentRepoIndexHandle), RunTreeFromTalError> where H: Fetcher + Clone + 'static, @@ -184,7 +186,7 @@ where current_repo_index.clone(), Arc::new(http_fetcher.clone()), Arc::clone(&rsync_fetcher_arc), - timing, + timing.clone(), download_log, ); let pool = RepoTransportWorkerPool::new(RepoWorkerPoolConfig::from(¶llel_config), executor) @@ -196,16 +198,142 @@ where let failure_resolver: Arc Option + Send + Sync> = Arc::new(move |base: &str| failure_rsync_fetcher_arc.failure_dedup_key(base)); let _ = policy; // policy reserved for later runtime-level decisions - let runtime = Arc::new(Phase1RepoSyncRuntime::new_with_failure_scope( - coordinator, - pool, - resolver, - failure_resolver, - policy.sync_preference, - )); + let runtime = Arc::new( + Phase1RepoSyncRuntime::new_with_failure_scope_and_prefetch_recording( + coordinator, + pool, + resolver, + failure_resolver, + policy.sync_preference, + record_transport_prefetch_requests, + ), + ); Ok((runtime, current_repo_index)) } +fn record_transport_prefetch_count(timing: Option<&TimingHandle>, key: &'static str, value: u64) { + if value == 0 { + return; + } + if let Some(timing) = timing { + timing.record_count(key, value); + } +} + +fn apply_transport_request_prefetch( + store: &crate::storage::RocksStore, + runtime: &Arc, + sync_preference: SyncPreference, + validation_time: time::OffsetDateTime, + config: &TreeRunConfig, + timing: Option<&TimingHandle>, +) -> Result<(), RunTreeFromTalError> { + if !config.enable_transport_request_prefetch { + return Ok(()); + } + + let snapshot = match store.get_transport_prefetch_snapshot() { + Ok(Some(snapshot)) => snapshot, + Ok(None) => { + crate::progress_log::emit( + "phase1_repo_prefetch_missing_snapshot", + serde_json::json!({ "status": "missing" }), + ); + return Ok(()); + } + Err(err) => { + record_transport_prefetch_count(timing, "transport_prefetch_load_errors", 1); + crate::progress_log::emit( + "phase1_repo_prefetch_load_error", + serde_json::json!({ "error": err.to_string() }), + ); + return Ok(()); + } + }; + + if !snapshot.is_compatible_with(sync_preference) { + record_transport_prefetch_count( + timing, + "transport_prefetch_skipped_incompatible_snapshots", + 1, + ); + crate::progress_log::emit( + "phase1_repo_prefetch_skipped", + serde_json::json!({ + "reason": "incompatible_snapshot", + "schema_version": snapshot.schema_version, + "request_count": snapshot.requests.len(), + }), + ); + return Ok(()); + } + + let stats = runtime + .prefetch_transport_requests(&snapshot, validation_time) + .map_err(TreeRunError::Runner)?; + record_transport_prefetch_count( + timing.clone(), + "transport_prefetch_loaded_requests", + stats.loaded_requests, + ); + record_transport_prefetch_count( + timing.clone(), + "transport_prefetch_enqueued_tasks", + stats.enqueued_tasks, + ); + record_transport_prefetch_count( + timing, + "transport_prefetch_waiting_requests", + stats.waiting_requests, + ); + record_transport_prefetch_count( + timing, + "transport_prefetch_reused_results", + stats.reused_results, + ); + record_transport_prefetch_count( + timing, + "transport_prefetch_skipped_incompatible_requests", + stats.skipped_incompatible, + ); + crate::progress_log::emit( + "phase1_repo_prefetch_applied", + serde_json::json!({ + "loaded_requests": stats.loaded_requests, + "enqueued_tasks": stats.enqueued_tasks, + "waiting_requests": stats.waiting_requests, + "reused_results": stats.reused_results, + "skipped_incompatible": stats.skipped_incompatible, + }), + ); + Ok(()) +} + +fn persist_transport_request_prefetch_snapshot( + store: &crate::storage::RocksStore, + runtime: &Arc, + config: &TreeRunConfig, + timing: Option<&TimingHandle>, +) -> Result<(), RunTreeFromTalError> { + if !config.enable_transport_request_prefetch { + return Ok(()); + } + let snapshot = runtime.transport_prefetch_snapshot(); + let recorded = snapshot.requests.len() as u64; + store + .put_transport_prefetch_snapshot(&snapshot) + .map_err(|err| TreeRunError::Runner(err.to_string()))?; + record_transport_prefetch_count(timing, "transport_prefetch_recorded_requests", recorded); + crate::progress_log::emit( + "phase1_repo_prefetch_persisted", + serde_json::json!({ + "recorded_requests": recorded, + "schema_version": snapshot.schema_version, + }), + ); + Ok(()) +} + fn root_discovery_from_tal_input( tal_input: &TalInputSpec, http_fetcher: &dyn Fetcher, @@ -588,6 +716,15 @@ where timing.clone(), Some(download_log.clone()), tal_inputs, + config.enable_transport_request_prefetch, + )?; + apply_transport_request_prefetch( + store.as_ref(), + &runtime, + policy.sync_preference, + validation_time, + config, + timing.as_ref(), )?; let current_repo_index_for_output = current_repo_index.clone(); let runner = make_live_runner( @@ -596,10 +733,10 @@ where http_fetcher, rsync_fetcher, validation_time, - timing, + timing.clone(), Some(download_log.clone()), Some(current_repo_index), - Some(runtime), + Some(Arc::clone(&runtime)), phase2_config, (phase2_enabled && config.build_ccr_accumulator) .then(|| CcrAccumulator::new(vec![discovery.trust_anchor.clone()])), @@ -622,6 +759,7 @@ where } else { run_tree_serial_audit(root, &runner, config)? }; + persist_transport_request_prefetch_snapshot(store.as_ref(), &runtime, config, timing.as_ref())?; let downloads = download_log.snapshot_events(); let download_stats = DownloadLogHandle::stats_from_events(&downloads); Ok(RunTreeFromTalAuditOutput { @@ -696,6 +834,15 @@ where timing.clone(), Some(download_log.clone()), successful_tal_inputs.clone(), + config.enable_transport_request_prefetch, + )?; + apply_transport_request_prefetch( + store.as_ref(), + &runtime, + policy.sync_preference, + validation_time, + config, + timing.as_ref(), )?; let current_repo_index_for_output = current_repo_index.clone(); let runner = make_live_runner( @@ -704,10 +851,10 @@ where http_fetcher, rsync_fetcher, validation_time, - timing, + timing.clone(), Some(download_log.clone()), Some(current_repo_index), - Some(runtime), + Some(Arc::clone(&runtime)), phase2_config, (phase2_enabled && config.build_ccr_accumulator).then(|| { CcrAccumulator::new( @@ -730,6 +877,7 @@ where } else { run_tree_serial_audit_multi_root(root_handles, &runner, config)? }; + persist_transport_request_prefetch_snapshot(store.as_ref(), &runtime, config, timing.as_ref())?; let downloads = download_log.snapshot_events(); let download_stats = DownloadLogHandle::stats_from_events(&downloads); Ok(RunTreeFromTalAuditOutput { @@ -2329,6 +2477,7 @@ mod replay_api_tests { persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, }, ) .unwrap_err(); @@ -2365,6 +2514,7 @@ mod replay_api_tests { persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, }, ) .expect("run replay root-only audit"); @@ -2411,6 +2561,7 @@ mod replay_api_tests { persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, }, ) .expect("run replay root-only audit"); @@ -2458,6 +2609,7 @@ mod replay_api_tests { persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, }, &timing, ) @@ -2515,6 +2667,7 @@ mod replay_api_tests { persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, }, ) .unwrap_err(); @@ -2548,6 +2701,7 @@ mod replay_api_tests { persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, }, ) .unwrap_err(); @@ -2601,6 +2755,7 @@ mod replay_api_tests { persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, }, ) .expect("run delta replay root-only audit"); @@ -2663,6 +2818,7 @@ mod replay_api_tests { persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, }, &timing, ) diff --git a/src/validation/tree.rs b/src/validation/tree.rs index ab4bf33..6d1b543 100644 --- a/src/validation/tree.rs +++ b/src/validation/tree.rs @@ -22,6 +22,8 @@ pub struct TreeRunConfig { pub build_ccr_accumulator: bool, /// Reuse accepted ROA validation outputs from previous VCIR when explicitly enabled. pub enable_roa_validation_cache: bool, + /// Prefetch transport requests from the previous run before normal tree traversal. + pub enable_transport_request_prefetch: bool, } impl Default for TreeRunConfig { @@ -33,6 +35,7 @@ impl Default for TreeRunConfig { persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, } } } diff --git a/src/validation/tree_parallel.rs b/src/validation/tree_parallel.rs index c5fd34c..5507a83 100644 --- a/src/validation/tree_parallel.rs +++ b/src/validation/tree_parallel.rs @@ -8,6 +8,7 @@ use crate::parallel::repo_runtime::{RepoSyncRequestStatus, RepoSyncRuntimeOutcom use crate::parallel::types::RepoIdentity; use crate::policy::SignedObjectFailurePolicy; use crate::report::Warning; +use crate::storage::VcirReplaceTimingBreakdown; use crate::validation::manifest::PublicationPointData; use crate::validation::objects::{ ObjectsOutput, OwnedRoaTask, ParallelObjectsPrepare, ParallelObjectsStage, @@ -19,7 +20,9 @@ use crate::validation::tree::{ TreeRunAuditOutput, TreeRunConfig, TreeRunError, TreeRunOutput, run_tree_serial_audit_multi_root, }; -use crate::validation::tree_runner::{FreshPublicationPointStage, Rpkiv1PublicationPointRunner}; +use crate::validation::tree_runner::{ + FreshPublicationPointFinalizeOutput, FreshPublicationPointStage, Rpkiv1PublicationPointRunner, +}; #[derive(Clone, Debug)] struct QueuedCaInstance { @@ -33,6 +36,7 @@ struct QueuedCaInstance { struct ReadyCaInstance { node: QueuedCaInstance, repo_outcome: RepoSyncRuntimeOutcome, + ready_enqueued_at: Instant, } struct InflightPublicationPoint { @@ -53,6 +57,7 @@ struct InflightPublicationPoint { worker_ms_max: u64, queue_wait_ms_total: u64, queue_wait_ms_max: u64, + finalize_enqueued_at: Option, results: Vec, } @@ -131,6 +136,12 @@ struct ReadyStageMetrics { snapshot_manifest_file_count: usize, child_discovery_ms: u64, child_enqueue_ms: u64, + ready_queue_wait_ms: u64, + ready_queue_len_after_pop: usize, + roa_presence_scan_ms: u64, + roa_cache_view_ms: u64, + direct_finalize_ms: u64, + fallback_full_run_ms: u64, prepare_ms: u64, build_roa_tasks_ms: u64, total_ms: u64, @@ -184,6 +195,16 @@ struct ReadyStageBatchMetrics { child_discovery_ms_max: u64, child_enqueue_ms_total: u64, child_enqueue_ms_max: u64, + ready_queue_wait_ms_total: u64, + ready_queue_wait_ms_max: u64, + roa_presence_scan_ms_total: u64, + roa_presence_scan_ms_max: u64, + roa_cache_view_ms_total: u64, + roa_cache_view_ms_max: u64, + direct_finalize_ms_total: u64, + direct_finalize_ms_max: u64, + fallback_full_run_ms_total: u64, + fallback_full_run_ms_max: u64, prepare_ms_total: u64, prepare_ms_max: u64, build_roa_tasks_ms_total: u64, @@ -269,6 +290,22 @@ impl ReadyStageBatchMetrics { self.child_discovery_ms_max = self.child_discovery_ms_max.max(metrics.child_discovery_ms); self.child_enqueue_ms_total += metrics.child_enqueue_ms; self.child_enqueue_ms_max = self.child_enqueue_ms_max.max(metrics.child_enqueue_ms); + self.ready_queue_wait_ms_total += metrics.ready_queue_wait_ms; + self.ready_queue_wait_ms_max = self + .ready_queue_wait_ms_max + .max(metrics.ready_queue_wait_ms); + self.roa_presence_scan_ms_total += metrics.roa_presence_scan_ms; + self.roa_presence_scan_ms_max = self + .roa_presence_scan_ms_max + .max(metrics.roa_presence_scan_ms); + self.roa_cache_view_ms_total += metrics.roa_cache_view_ms; + self.roa_cache_view_ms_max = self.roa_cache_view_ms_max.max(metrics.roa_cache_view_ms); + self.direct_finalize_ms_total += metrics.direct_finalize_ms; + self.direct_finalize_ms_max = self.direct_finalize_ms_max.max(metrics.direct_finalize_ms); + self.fallback_full_run_ms_total += metrics.fallback_full_run_ms; + self.fallback_full_run_ms_max = self + .fallback_full_run_ms_max + .max(metrics.fallback_full_run_ms); self.prepare_ms_total += metrics.prepare_ms; self.prepare_ms_max = self.prepare_ms_max.max(metrics.prepare_ms); self.build_roa_tasks_ms_total += metrics.build_roa_tasks_ms; @@ -311,6 +348,21 @@ struct FinalizePublicationPointMetrics { finalize_ms: u64, finalize_queue_wait_ms: Option, finalize_worker_ms: u64, + snapshot_pack_ms: u64, + persist_vcir_ms: u64, + persist_build_vcir_ms: u64, + persist_replace_vcir_ms: u64, + persist_replace_breakdown: VcirReplaceTimingBreakdown, + ccr_projection_build_ms: u64, + ccr_append_ms: u64, + audit_build_ms: u64, + locked_files: usize, + child_count: usize, + warning_count: usize, + vrp_count: usize, + vap_count: usize, + router_key_count: usize, + audit_object_count: usize, } #[derive(Default)] @@ -323,6 +375,20 @@ struct FinalizeResultsDrainMetrics { finalize_queue_wait_ms_max: u64, finalize_worker_ms_total: u64, finalize_worker_ms_max: u64, + snapshot_pack_ms_total: u64, + snapshot_pack_ms_max: u64, + persist_vcir_ms_total: u64, + persist_vcir_ms_max: u64, + persist_build_vcir_ms_total: u64, + persist_build_vcir_ms_max: u64, + persist_replace_vcir_ms_total: u64, + persist_replace_vcir_ms_max: u64, + ccr_projection_build_ms_total: u64, + ccr_projection_build_ms_max: u64, + ccr_append_ms_total: u64, + ccr_append_ms_max: u64, + audit_build_ms_total: u64, + audit_build_ms_max: u64, duration_ms: u64, } @@ -517,14 +583,17 @@ pub fn run_tree_parallel_phase2_audit_multi_root( let Some(ready) = ready_queue.pop_front() else { break; }; + let ready_queue_len_after_pop = ready_queue.len(); let metrics = stage_ready_publication_point( runner, &mut next_id, &mut ca_queue, &mut pending_roa_dispatch, &mut inflight_publication_points, + &mut pending_finalization, &mut finished, ready, + ready_queue_len_after_pop, config.compact_audit, ); ready_batch_metrics.record(metrics); @@ -543,42 +612,18 @@ pub fn run_tree_parallel_phase2_audit_multi_root( ready_time_budget_exhausted = ready_time_budget_exhausted || (!ready_queue.is_empty() && ready_batch_metrics.total_ms >= ready_batch_wall_time_budget_ms); - crate::progress_log::emit( - "phase2_ready_queue_batch", - serde_json::json!({ - "ready_count": ready_batch_metrics.ready_count, - "fallback_count": ready_batch_metrics.fallback_count, - "complete_count": ready_batch_metrics.complete_count, - "staged_count": ready_batch_metrics.staged_count, - "zero_task_count": ready_batch_metrics.zero_task_count, - "error_count": ready_batch_metrics.error_count, - "discovered_children": ready_batch_metrics.discovered_children, - "locked_files": ready_batch_metrics.locked_files, - "roa_tasks": ready_batch_metrics.roa_tasks, - "aspa_objects": ready_batch_metrics.aspa_objects, - "stage_fresh_ms_total": ready_batch_metrics.stage_fresh_ms_total, - "stage_fresh_ms_max": ready_batch_metrics.stage_fresh_ms_max, - "stage_fresh_ms_max_manifest_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_manifest_rsync_uri, - "stage_fresh_ms_max_publication_point_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_publication_point_rsync_uri, - "child_enqueue_ms_total": ready_batch_metrics.child_enqueue_ms_total, - "child_enqueue_ms_max": ready_batch_metrics.child_enqueue_ms_max, - "prepare_ms_total": ready_batch_metrics.prepare_ms_total, - "prepare_ms_max": ready_batch_metrics.prepare_ms_max, - "build_roa_tasks_ms_total": ready_batch_metrics.build_roa_tasks_ms_total, - "build_roa_tasks_ms_max": ready_batch_metrics.build_roa_tasks_ms_max, - "batch_duration_ms": ready_batch_metrics.total_ms, - "ready_batch_size": ready_batch_size, - "ready_batch_wall_time_budget_ms": ready_batch_wall_time_budget_ms, - "ready_queue_len_after_batch": ready_queue.len(), - "ready_queue_budget_exhausted": !ready_queue.is_empty(), - "ready_count_budget_exhausted": ready_count_budget_exhausted, - "ready_time_budget_exhausted": ready_time_budget_exhausted, - "ca_queue_len_after_batch": ca_queue.len(), - "pending_roa_dispatch_len_after_batch": pending_roa_dispatch.len(), - "inflight_publication_points_after_batch": inflight_publication_points.len(), - "pending_finalization_len_after_batch": pending_finalization.len(), - "finalize_inflight_after_batch": finalize_inflight, - }), + emit_ready_queue_batch_progress( + &ready_batch_metrics, + ready_batch_size, + ready_batch_wall_time_budget_ms, + ready_queue.len(), + ready_count_budget_exhausted, + ready_time_budget_exhausted, + ca_queue.len(), + pending_roa_dispatch.len(), + inflight_publication_points.len(), + pending_finalization.len(), + finalize_inflight, ); crate::progress_log::emit( "phase2_ready_queue_stage_fresh_breakdown", @@ -699,6 +744,75 @@ pub fn run_tree_parallel_phase2_audit_multi_root( }); } +fn emit_ready_queue_batch_progress( + metrics: &ReadyStageBatchMetrics, + ready_batch_size: usize, + ready_batch_wall_time_budget_ms: u64, + ready_queue_len_after_batch: usize, + ready_count_budget_exhausted: bool, + ready_time_budget_exhausted: bool, + ca_queue_len_after_batch: usize, + pending_roa_dispatch_len_after_batch: usize, + inflight_publication_points_after_batch: usize, + pending_finalization_len_after_batch: usize, + finalize_inflight_after_batch: usize, +) { + crate::progress_log::emit( + "phase2_ready_queue_batch", + serde_json::json!({ + "ready_count": metrics.ready_count, + "fallback_count": metrics.fallback_count, + "complete_count": metrics.complete_count, + "staged_count": metrics.staged_count, + "zero_task_count": metrics.zero_task_count, + "error_count": metrics.error_count, + "discovered_children": metrics.discovered_children, + "locked_files": metrics.locked_files, + "roa_tasks": metrics.roa_tasks, + "aspa_objects": metrics.aspa_objects, + "stage_fresh_ms_total": metrics.stage_fresh_ms_total, + "stage_fresh_ms_max": metrics.stage_fresh_ms_max, + "stage_fresh_ms_max_manifest_rsync_uri": metrics.stage_fresh_ms_max_manifest_rsync_uri, + "stage_fresh_ms_max_publication_point_rsync_uri": metrics.stage_fresh_ms_max_publication_point_rsync_uri, + "prepare_ms_total": metrics.prepare_ms_total, + "prepare_ms_max": metrics.prepare_ms_max, + "build_roa_tasks_ms_total": metrics.build_roa_tasks_ms_total, + "build_roa_tasks_ms_max": metrics.build_roa_tasks_ms_max, + "batch_duration_ms": metrics.total_ms, + "ready_batch_size": ready_batch_size, + "ready_batch_wall_time_budget_ms": ready_batch_wall_time_budget_ms, + "ready_queue_len_after_batch": ready_queue_len_after_batch, + "ready_queue_budget_exhausted": ready_queue_len_after_batch > 0, + "ready_count_budget_exhausted": ready_count_budget_exhausted, + "ready_time_budget_exhausted": ready_time_budget_exhausted, + "ca_queue_len_after_batch": ca_queue_len_after_batch, + "pending_roa_dispatch_len_after_batch": pending_roa_dispatch_len_after_batch, + "inflight_publication_points_after_batch": inflight_publication_points_after_batch, + "pending_finalization_len_after_batch": pending_finalization_len_after_batch, + "finalize_inflight_after_batch": finalize_inflight_after_batch, + }), + ); + crate::progress_log::emit( + "phase2_ready_queue_control_breakdown", + serde_json::json!({ + "ready_count": metrics.ready_count, + "ready_queue_wait_ms_total": metrics.ready_queue_wait_ms_total, + "ready_queue_wait_ms_max": metrics.ready_queue_wait_ms_max, + "child_enqueue_ms_total": metrics.child_enqueue_ms_total, + "child_enqueue_ms_max": metrics.child_enqueue_ms_max, + "roa_presence_scan_ms_total": metrics.roa_presence_scan_ms_total, + "roa_presence_scan_ms_max": metrics.roa_presence_scan_ms_max, + "roa_cache_view_ms_total": metrics.roa_cache_view_ms_total, + "roa_cache_view_ms_max": metrics.roa_cache_view_ms_max, + "direct_finalize_ms_total": metrics.direct_finalize_ms_total, + "direct_finalize_ms_max": metrics.direct_finalize_ms_max, + "fallback_full_run_ms_total": metrics.fallback_full_run_ms_total, + "fallback_full_run_ms_max": metrics.fallback_full_run_ms_max, + "batch_duration_ms": metrics.total_ms, + }), + ); +} + fn can_start_more(instances_started: usize, config: &TreeRunConfig) -> bool { config .max_instances @@ -737,6 +851,7 @@ fn start_queued_ca_instances( ready_queue.push_back(ReadyCaInstance { node, repo_outcome: outcome, + ready_enqueued_at: Instant::now(), }); } Ok(RepoSyncRequestStatus::Pending { identity, .. }) => { @@ -761,15 +876,22 @@ fn stage_ready_publication_point( ca_queue: &mut VecDeque, pending_roa_dispatch: &mut VecDeque, inflight_publication_points: &mut HashMap, + pending_finalization: &mut VecDeque, finished: &mut Vec, ready: ReadyCaInstance, + ready_queue_len_after_pop: usize, compact_audit: bool, ) -> ReadyStageMetrics { let publication_point_started = Instant::now(); + let ready_queue_wait_ms = publication_point_started + .saturating_duration_since(ready.ready_enqueued_at) + .as_millis() as u64; let mut metrics = ReadyStageMetrics { ready_count: 1, manifest_rsync_uri: Some(ready.node.handle.manifest_rsync_uri.clone()), publication_point_rsync_uri: Some(ready.node.handle.publication_point_rsync_uri.clone()), + ready_queue_wait_ms, + ready_queue_len_after_pop, ..ReadyStageMetrics::default() }; let mut warnings = ready.repo_outcome.warnings.clone(); @@ -802,7 +924,9 @@ fn stage_ready_publication_point( ); } metrics.fallback_count = 1; + let fallback_started = Instant::now(); let fallback = runner.run_publication_point(&ready.node.handle); + metrics.fallback_full_run_ms = elapsed_ms(fallback_started); if let Ok(result) = fallback.as_ref() { metrics.discovered_children = result.discovered_children.len(); let child_enqueue_started = Instant::now(); @@ -820,6 +944,17 @@ fn stage_ready_publication_point( result: compact_phase2_finished_result_result(fallback, compact_audit), }); metrics.total_ms = elapsed_ms(publication_point_started); + emit_ready_publication_point_control_slow( + metrics.manifest_rsync_uri.as_deref().unwrap_or_default(), + metrics + .publication_point_rsync_uri + .as_deref() + .unwrap_or_default(), + &repo_outcome, + &metrics, + "fallback", + true, + ); return metrics; } }; @@ -890,11 +1025,13 @@ fn stage_ready_publication_point( metrics.child_enqueue_ms = elapsed_ms(child_enqueue_started); let prepare_started = Instant::now(); + let roa_presence_scan_started = Instant::now(); let has_roa = fresh_stage .fresh_point .files() .iter() .any(|file| file.rsync_uri.ends_with(".roa")); + metrics.roa_presence_scan_ms = elapsed_ms(roa_presence_scan_started); if runner.enable_roa_validation_cache { if let Some(timing) = runner.timing.as_ref() { if has_roa { @@ -905,8 +1042,11 @@ fn stage_ready_publication_point( } } let roa_cache_view = if has_roa { - runner - .roa_validation_cache_view_for_fresh_point(&fresh_stage.fresh_point.manifest_rsync_uri) + let roa_cache_view_started = Instant::now(); + let view = runner + .roa_validation_cache_view_for_fresh_point(&fresh_stage.fresh_point.manifest_rsync_uri); + metrics.roa_cache_view_ms = elapsed_ms(roa_cache_view_started); + view } else { None }; @@ -941,16 +1081,20 @@ fn stage_ready_publication_point( &objects.router_keys, ), ); - finalize_ready_objects( + let direct_finalize_started = Instant::now(); + let finalize_metrics = finalize_ready_objects( runner, ready.node, fresh_stage, warnings, objects, - repo_outcome, + repo_outcome.clone(), finished, compact_audit, ); + metrics.direct_finalize_ms = finalize_metrics + .finalize_ms + .max(elapsed_ms(direct_finalize_started)); } ParallelObjectsPrepare::Staged(objects_stage) => { metrics.prepare_ms = elapsed_ms(prepare_started); @@ -964,41 +1108,12 @@ fn stage_ready_publication_point( metrics.roa_tasks = task_count; if task_count == 0 { metrics.zero_task_count = 1; - match reduce_parallel_roa_stage(objects_stage, Vec::new(), runner.timing.as_ref()) { - Ok(mut objects) => { - objects - .router_keys - .extend(fresh_stage.discovered_router_keys.clone()); - objects.local_outputs_cache.extend( - crate::validation::tree_runner::build_router_key_local_outputs( - &ready.node.handle, - &objects.router_keys, - ), - ); - finalize_ready_objects( - runner, - ready.node, - fresh_stage, - warnings, - objects, - repo_outcome, - finished, - compact_audit, - ); - } - Err(err) => finished.push(FinishedPublicationPoint { - node: FinishedPublicationPointNode::from_queued(ready.node), - result: FinishedPublicationPointResult::Err(err), - }), - } - } else { - inflight_publication_points.insert( - ready.node.id, - InflightPublicationPoint { + pending_finalization.push_back(FinalizeTask { + state: InflightPublicationPoint { node: ready.node, fresh_stage, objects_stage, - repo_outcome, + repo_outcome: repo_outcome.clone(), warnings, started_at: publication_point_started, objects_started_at: Instant::now(), @@ -1012,6 +1127,32 @@ fn stage_ready_publication_point( worker_ms_max: 0, queue_wait_ms_total: 0, queue_wait_ms_max: 0, + finalize_enqueued_at: Some(Instant::now()), + results: Vec::new(), + }, + }); + } else { + inflight_publication_points.insert( + ready.node.id, + InflightPublicationPoint { + node: ready.node, + fresh_stage, + objects_stage, + repo_outcome: repo_outcome.clone(), + warnings, + started_at: publication_point_started, + objects_started_at: Instant::now(), + task_count, + tasks_submitted: 0, + first_task_submitted_at: None, + last_task_submitted_at: None, + first_result_at: None, + last_result_at: None, + worker_ms_total: 0, + worker_ms_max: 0, + queue_wait_ms_total: 0, + queue_wait_ms_max: 0, + finalize_enqueued_at: None, results: Vec::with_capacity(task_count), }, ); @@ -1019,9 +1160,99 @@ fn stage_ready_publication_point( } } metrics.total_ms = elapsed_ms(publication_point_started); + emit_ready_publication_point_control_slow( + metrics.manifest_rsync_uri.as_deref().unwrap_or_default(), + metrics + .publication_point_rsync_uri + .as_deref() + .unwrap_or_default(), + &repo_outcome, + &metrics, + if metrics.complete_count > 0 { + "complete" + } else if metrics.zero_task_count > 0 { + "zero_task" + } else { + "staged" + }, + false, + ); metrics } +fn emit_ready_publication_point_control_slow( + manifest_rsync_uri: &str, + publication_point_rsync_uri: &str, + repo_outcome: &RepoSyncRuntimeOutcome, + metrics: &ReadyStageMetrics, + status: &str, + force_error_path: bool, +) { + let threshold_ms = crate::progress_log::pp_control_slow_threshold_ms(); + if !force_error_path && metrics.total_ms < threshold_ms { + return; + } + crate::progress_log::emit( + "phase2_ready_publication_point_control_slow", + serde_json::json!({ + "manifest_rsync_uri": manifest_rsync_uri, + "publication_point_rsync_uri": publication_point_rsync_uri, + "status": status, + "repo_sync_source": repo_outcome.repo_sync_source.as_deref(), + "repo_sync_phase": repo_outcome.repo_sync_phase.as_deref(), + "repo_sync_duration_ms": repo_outcome.repo_sync_duration_ms, + "repo_sync_ok": repo_outcome.repo_sync_ok, + "repo_sync_err": repo_outcome.repo_sync_err.as_deref(), + "ready_queue_wait_ms": metrics.ready_queue_wait_ms, + "ready_queue_len_after_pop": metrics.ready_queue_len_after_pop, + "stage_fresh_ms": metrics.stage_fresh_ms, + "child_discovery_ms": metrics.child_discovery_ms, + "child_enqueue_ms": metrics.child_enqueue_ms, + "discovered_children": metrics.discovered_children, + "roa_presence_scan_ms": metrics.roa_presence_scan_ms, + "roa_cache_view_ms": metrics.roa_cache_view_ms, + "prepare_ms": metrics.prepare_ms, + "build_roa_tasks_ms": metrics.build_roa_tasks_ms, + "direct_finalize_ms": metrics.direct_finalize_ms, + "fallback_full_run_ms": metrics.fallback_full_run_ms, + "locked_files": metrics.locked_files, + "roa_tasks": metrics.roa_tasks, + "aspa_objects": metrics.aspa_objects, + "complete_count": metrics.complete_count, + "staged_count": metrics.staged_count, + "zero_task_count": metrics.zero_task_count, + "fallback_count": metrics.fallback_count, + "total_ms": metrics.total_ms, + "slow_threshold_ms": threshold_ms, + }), + ); + crate::progress_log::emit( + "phase2_ready_publication_point_control_snapshot_breakdown", + serde_json::json!({ + "manifest_rsync_uri": manifest_rsync_uri, + "publication_point_rsync_uri": publication_point_rsync_uri, + "status": status, + "snapshot_prepare_ms": metrics.snapshot_prepare_ms, + "snapshot_current_index_lock_ms": metrics.snapshot_current_index_lock_ms, + "snapshot_manifest_load_ms": metrics.snapshot_manifest_load_ms, + "snapshot_manifest_index_lookup_ms": metrics.snapshot_manifest_index_lookup_ms, + "snapshot_manifest_blob_load_ms": metrics.snapshot_manifest_blob_load_ms, + "snapshot_manifest_decode_ms": metrics.snapshot_manifest_decode_ms, + "snapshot_replay_guard_ms": metrics.snapshot_replay_guard_ms, + "replay_meta_hit_count": metrics.replay_meta_hit_count, + "replay_meta_miss_count": metrics.replay_meta_miss_count, + "snapshot_manifest_entries_ms": metrics.snapshot_manifest_entries_ms, + "snapshot_pack_files_ms": metrics.snapshot_pack_files_ms, + "snapshot_pack_files_index_lookup_ms": metrics.snapshot_pack_files_index_lookup_ms, + "snapshot_pack_files_blob_load_ms": metrics.snapshot_pack_files_blob_load_ms, + "snapshot_ee_path_validate_ms": metrics.snapshot_ee_path_validate_ms, + "snapshot_manifest_file_count": metrics.snapshot_manifest_file_count, + "total_ms": metrics.total_ms, + "slow_threshold_ms": threshold_ms, + }), + ); +} + fn enqueue_discovered_children( runner: &Rpkiv1PublicationPointRunner<'_>, next_id: &mut u64, @@ -1064,25 +1295,129 @@ fn finalize_ready_objects( repo_outcome: RepoSyncRuntimeOutcome, finished: &mut Vec, compact_audit: bool, -) { - let result = runner - .finalize_fresh_publication_point_from_reducer( - &node.handle, - &fresh_stage.fresh_point, - warnings, - objects, - fresh_stage.child_audits, - fresh_stage.discovered_children, - repo_outcome.repo_sync_source.as_deref(), - repo_outcome.repo_sync_phase.as_deref(), - repo_outcome.repo_sync_duration_ms, - repo_outcome.repo_sync_err.as_deref(), - ) - .map(|out| out.result); +) -> FinalizePublicationPointMetrics { + let locked_files = fresh_stage.fresh_point.files().len(); + let finalize_started = Instant::now(); + let finalized = runner.finalize_fresh_publication_point_from_reducer( + &node.handle, + &fresh_stage.fresh_point, + warnings, + objects, + fresh_stage.child_audits, + fresh_stage.discovered_children, + repo_outcome.repo_sync_source.as_deref(), + repo_outcome.repo_sync_phase.as_deref(), + repo_outcome.repo_sync_duration_ms, + repo_outcome.repo_sync_err.as_deref(), + ); + let finalize_ms = elapsed_ms(finalize_started); + let (result, metrics) = match finalized { + Ok(output) => { + let metrics = finalize_metrics_from_output( + &output, + 0, + finalize_ms, + None, + finalize_ms, + locked_files, + ); + emit_finalize_breakdown( + "phase2_direct_finalize_breakdown", + &node.handle.manifest_rsync_uri, + &node.handle.publication_point_rsync_uri, + &metrics, + ); + ( + compact_phase2_finished_result(output.result, compact_audit), + metrics, + ) + } + Err(err) => { + let metrics = FinalizePublicationPointMetrics { + finalize_ms, + finalize_worker_ms: finalize_ms, + locked_files, + ..FinalizePublicationPointMetrics::default() + }; + emit_finalize_breakdown( + "phase2_direct_finalize_breakdown", + &node.handle.manifest_rsync_uri, + &node.handle.publication_point_rsync_uri, + &metrics, + ); + (FinishedPublicationPointResult::Err(err), metrics) + } + }; finished.push(FinishedPublicationPoint { node: FinishedPublicationPointNode::from_queued(node), - result: compact_phase2_finished_result_result(result, compact_audit), + result, }); + metrics +} + +fn finalize_metrics_from_output( + output: &FreshPublicationPointFinalizeOutput, + reduce_ms: u64, + finalize_ms: u64, + finalize_queue_wait_ms: Option, + finalize_worker_ms: u64, + locked_files: usize, +) -> FinalizePublicationPointMetrics { + FinalizePublicationPointMetrics { + reduce_ms, + finalize_ms, + finalize_queue_wait_ms, + finalize_worker_ms, + snapshot_pack_ms: output.snapshot_pack_ms, + persist_vcir_ms: output.persist_vcir_ms, + persist_build_vcir_ms: output.persist_vcir_timing.build_vcir_ms, + persist_replace_vcir_ms: output.persist_vcir_timing.replace_vcir_ms, + persist_replace_breakdown: output.persist_vcir_timing.replace_vcir.clone(), + ccr_projection_build_ms: output.ccr_projection_build_ms, + ccr_append_ms: output.ccr_append_ms, + audit_build_ms: output.audit_build_ms, + locked_files, + child_count: output.result.discovered_children.len(), + warning_count: output.result.warnings.len(), + vrp_count: output.result.objects.vrps.len(), + vap_count: output.result.objects.aspas.len(), + router_key_count: output.result.objects.router_keys.len(), + audit_object_count: output.result.audit.objects.len(), + } +} + +fn emit_finalize_breakdown( + event_name: &str, + manifest_rsync_uri: &str, + publication_point_rsync_uri: &str, + metrics: &FinalizePublicationPointMetrics, +) { + crate::progress_log::emit( + event_name, + serde_json::json!({ + "manifest_rsync_uri": manifest_rsync_uri, + "publication_point_rsync_uri": publication_point_rsync_uri, + "reduce_ms": metrics.reduce_ms, + "finalize_ms": metrics.finalize_ms, + "finalize_queue_wait_ms": metrics.finalize_queue_wait_ms, + "finalize_worker_ms": metrics.finalize_worker_ms, + "snapshot_pack_ms": metrics.snapshot_pack_ms, + "persist_vcir_ms": metrics.persist_vcir_ms, + "persist_build_vcir_ms": metrics.persist_build_vcir_ms, + "persist_replace_vcir_ms": metrics.persist_replace_vcir_ms, + "persist_replace_breakdown": &metrics.persist_replace_breakdown, + "ccr_projection_build_ms": metrics.ccr_projection_build_ms, + "ccr_append_ms": metrics.ccr_append_ms, + "audit_build_ms": metrics.audit_build_ms, + "locked_files": metrics.locked_files, + "child_count": metrics.child_count, + "warning_count": metrics.warning_count, + "vrp_count": metrics.vrp_count, + "vap_count": metrics.vap_count, + "router_key_count": metrics.router_key_count, + "audit_object_count": metrics.audit_object_count, + }), + ); } fn flush_pending_roa_dispatch( @@ -1195,9 +1530,10 @@ fn drain_object_results( false }; if should_finalize { - let state = inflight_publication_points + let mut state = inflight_publication_points .remove(&pp_id) .expect("inflight publication point must exist"); + state.finalize_enqueued_at = Some(Instant::now()); metrics.publication_points_completed += 1; pending_finalization.push_back(FinalizeTask { state }); } @@ -1321,6 +1657,33 @@ fn drain_finalize_results( metrics.finalize_worker_ms_max = metrics .finalize_worker_ms_max .max(result.metrics.finalize_worker_ms); + metrics.snapshot_pack_ms_total += result.metrics.snapshot_pack_ms; + metrics.snapshot_pack_ms_max = metrics + .snapshot_pack_ms_max + .max(result.metrics.snapshot_pack_ms); + metrics.persist_vcir_ms_total += result.metrics.persist_vcir_ms; + metrics.persist_vcir_ms_max = metrics + .persist_vcir_ms_max + .max(result.metrics.persist_vcir_ms); + metrics.persist_build_vcir_ms_total += result.metrics.persist_build_vcir_ms; + metrics.persist_build_vcir_ms_max = metrics + .persist_build_vcir_ms_max + .max(result.metrics.persist_build_vcir_ms); + metrics.persist_replace_vcir_ms_total += result.metrics.persist_replace_vcir_ms; + metrics.persist_replace_vcir_ms_max = metrics + .persist_replace_vcir_ms_max + .max(result.metrics.persist_replace_vcir_ms); + metrics.ccr_projection_build_ms_total += result.metrics.ccr_projection_build_ms; + metrics.ccr_projection_build_ms_max = metrics + .ccr_projection_build_ms_max + .max(result.metrics.ccr_projection_build_ms); + metrics.ccr_append_ms_total += result.metrics.ccr_append_ms; + metrics.ccr_append_ms_max = + metrics.ccr_append_ms_max.max(result.metrics.ccr_append_ms); + metrics.audit_build_ms_total += result.metrics.audit_build_ms; + metrics.audit_build_ms_max = metrics + .audit_build_ms_max + .max(result.metrics.audit_build_ms); *finalize_inflight = finalize_inflight.saturating_sub(1); finished.push(result.finished); } @@ -1363,6 +1726,20 @@ fn drain_finalize_results_with_progress( "finalize_queue_wait_ms_max": drain_metrics.finalize_queue_wait_ms_max, "finalize_worker_ms_total": drain_metrics.finalize_worker_ms_total, "finalize_worker_ms_max": drain_metrics.finalize_worker_ms_max, + "snapshot_pack_ms_total": drain_metrics.snapshot_pack_ms_total, + "snapshot_pack_ms_max": drain_metrics.snapshot_pack_ms_max, + "persist_vcir_ms_total": drain_metrics.persist_vcir_ms_total, + "persist_vcir_ms_max": drain_metrics.persist_vcir_ms_max, + "persist_build_vcir_ms_total": drain_metrics.persist_build_vcir_ms_total, + "persist_build_vcir_ms_max": drain_metrics.persist_build_vcir_ms_max, + "persist_replace_vcir_ms_total": drain_metrics.persist_replace_vcir_ms_total, + "persist_replace_vcir_ms_max": drain_metrics.persist_replace_vcir_ms_max, + "ccr_projection_build_ms_total": drain_metrics.ccr_projection_build_ms_total, + "ccr_projection_build_ms_max": drain_metrics.ccr_projection_build_ms_max, + "ccr_append_ms_total": drain_metrics.ccr_append_ms_total, + "ccr_append_ms_max": drain_metrics.ccr_append_ms_max, + "audit_build_ms_total": drain_metrics.audit_build_ms_total, + "audit_build_ms_max": drain_metrics.audit_build_ms_max, "duration_ms": drain_metrics.duration_ms, "pending_finalization_len": pending_finalization_len, "finalize_inflight": *finalize_inflight, @@ -1415,19 +1792,21 @@ fn finalize_publication_point_state( worker_ms_max, queue_wait_ms_total, queue_wait_ms_max, + finalize_enqueued_at, results, } = state; - let finalize_queue_wait_ms = last_result_at.map(|last_result| { + let finalize_queue_wait_ms = finalize_enqueued_at.or(last_result_at).map(|ready_at| { Instant::now() - .saturating_duration_since(last_result) + .saturating_duration_since(ready_at) .as_millis() as u64 }); let objects_processing_ms = objects_started_at.elapsed().as_millis() as u64; let reduce_started = Instant::now(); + let locked_files = objects_stage.locked_file_count(); let reduce_result = reduce_parallel_roa_stage(objects_stage, results, runner.timing.as_ref()); let reduce_ms = elapsed_ms(reduce_started); - let (result, finalize_ms) = match reduce_result { + let (result, mut metrics) = match reduce_result { Ok(mut objects) => { let finalize_started = Instant::now(); objects @@ -1439,28 +1818,64 @@ fn finalize_publication_point_state( &objects.router_keys, ), ); - let result = runner - .finalize_fresh_publication_point_from_reducer( - &node.handle, - &fresh_stage.fresh_point, - warnings, - objects, - fresh_stage.child_audits, - fresh_stage.discovered_children, - repo_outcome.repo_sync_source.as_deref(), - repo_outcome.repo_sync_phase.as_deref(), - repo_outcome.repo_sync_duration_ms, - repo_outcome.repo_sync_err.as_deref(), - ) - .map(|out| out.result); - ( - compact_phase2_finished_result_result(result, compact_audit), - elapsed_ms(finalize_started), - ) + let finalized = runner.finalize_fresh_publication_point_from_reducer( + &node.handle, + &fresh_stage.fresh_point, + warnings, + objects, + fresh_stage.child_audits, + fresh_stage.discovered_children, + repo_outcome.repo_sync_source.as_deref(), + repo_outcome.repo_sync_phase.as_deref(), + repo_outcome.repo_sync_duration_ms, + repo_outcome.repo_sync_err.as_deref(), + ); + let finalize_ms = elapsed_ms(finalize_started); + match finalized { + Ok(output) => { + let metrics = finalize_metrics_from_output( + &output, + reduce_ms, + finalize_ms, + finalize_queue_wait_ms, + 0, + locked_files, + ); + ( + compact_phase2_finished_result(output.result, compact_audit), + metrics, + ) + } + Err(err) => ( + FinishedPublicationPointResult::Err(err), + FinalizePublicationPointMetrics { + reduce_ms, + finalize_ms, + finalize_queue_wait_ms, + locked_files, + ..FinalizePublicationPointMetrics::default() + }, + ), + } } - Err(err) => (FinishedPublicationPointResult::Err(err), 0), + Err(err) => ( + FinishedPublicationPointResult::Err(err), + FinalizePublicationPointMetrics { + reduce_ms, + finalize_queue_wait_ms, + locked_files, + ..FinalizePublicationPointMetrics::default() + }, + ), }; let finalize_worker_ms = elapsed_ms(finalize_worker_started); + metrics.finalize_worker_ms = finalize_worker_ms; + emit_finalize_breakdown( + "phase2_finalize_worker_breakdown", + node.handle.manifest_rsync_uri.as_str(), + node.handle.publication_point_rsync_uri.as_str(), + &metrics, + ); crate::progress_log::emit( "phase2_publication_point_reduced", serde_json::json!({ @@ -1490,8 +1905,22 @@ fn finalize_publication_point_state( "queue_wait_ms_max": queue_wait_ms_max, "queue_wait_ms_avg": if task_count > 0 { queue_wait_ms_total / task_count as u64 } else { 0 }, "reduce_ms": reduce_ms, - "finalize_ms": finalize_ms, + "finalize_ms": metrics.finalize_ms, "finalize_worker_ms": finalize_worker_ms, + "snapshot_pack_ms": metrics.snapshot_pack_ms, + "persist_vcir_ms": metrics.persist_vcir_ms, + "persist_build_vcir_ms": metrics.persist_build_vcir_ms, + "persist_replace_vcir_ms": metrics.persist_replace_vcir_ms, + "ccr_projection_build_ms": metrics.ccr_projection_build_ms, + "ccr_append_ms": metrics.ccr_append_ms, + "audit_build_ms": metrics.audit_build_ms, + "locked_files": metrics.locked_files, + "child_count": metrics.child_count, + "warning_count": metrics.warning_count, + "vrp_count": metrics.vrp_count, + "vap_count": metrics.vap_count, + "router_key_count": metrics.router_key_count, + "audit_object_count": metrics.audit_object_count, "total_duration_ms": started_at.elapsed().as_millis() as u64, }), ); @@ -1500,12 +1929,7 @@ fn finalize_publication_point_state( node: FinishedPublicationPointNode::from_queued(node), result, }, - metrics: FinalizePublicationPointMetrics { - reduce_ms, - finalize_ms, - finalize_queue_wait_ms, - finalize_worker_ms, - }, + metrics, } } @@ -1536,6 +1960,7 @@ fn drain_repo_events( ready_queue.push_back(ReadyCaInstance { node, repo_outcome: outcome.clone(), + ready_enqueued_at: Instant::now(), }); } } @@ -1658,14 +2083,19 @@ pub fn run_tree_parallel_phase2_audit( mod tests { use super::{ FinishedPublicationPointResult, compact_phase2_finished_result, - compact_phase2_finished_result_result, + compact_phase2_finished_result_result, finalize_metrics_from_output, + }; + use crate::audit::{ + AuditObjectKind, AuditObjectResult, ObjectAuditEntry, PublicationPointAudit, }; - use crate::audit::PublicationPointAudit; use crate::storage::PackTime; use crate::validation::manifest::PublicationPointSource; use crate::validation::objects::{ObjectsOutput, ObjectsStats}; use crate::validation::publication_point::PublicationPointSnapshot; use crate::validation::tree::PublicationPointRunResult; + use crate::validation::tree_runner::{ + BuildVcirTimingBreakdown, FreshPublicationPointFinalizeOutput, PersistVcirTimingBreakdown, + }; fn sample_snapshot() -> PublicationPointSnapshot { PublicationPointSnapshot { @@ -1758,4 +2188,97 @@ mod tests { FinishedPublicationPointResult::Ok { .. } => panic!("error should be preserved"), } } + + #[test] + fn finalize_metrics_from_output_captures_breakdown_and_counts() { + let mut result = sample_result(); + result.objects.vrps.push(crate::validation::objects::Vrp { + asn: 64496, + prefix: crate::data_model::roa::IpPrefix { + afi: crate::data_model::roa::RoaAfi::Ipv4, + addr: [192, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + prefix_len: 24, + }, + max_length: 24, + }); + result + .objects + .aspas + .push(crate::validation::objects::AspaAttestation { + customer_as_id: 64497, + provider_as_ids: vec![64498], + }); + result.audit.objects.push(ObjectAuditEntry { + rsync_uri: "rsync://example.test/repo/a.roa".to_string(), + sha256_hex: "11".repeat(32), + kind: AuditObjectKind::Roa, + result: AuditObjectResult::Ok, + detail: None, + }); + result + .discovered_children + .push(crate::validation::tree::DiscoveredChildCaInstance { + handle: crate::validation::tree::CaInstanceHandle { + tal_id: "test".to_string(), + ca_certificate_der: vec![1], + ca_certificate_rsync_uri: Some( + "rsync://example.test/repo/child.cer".to_string(), + ), + effective_ip_resources: None, + effective_as_resources: None, + manifest_rsync_uri: "rsync://example.test/repo/child.mft".to_string(), + publication_point_rsync_uri: "rsync://example.test/repo/".to_string(), + rsync_base_uri: "rsync://example.test/repo/".to_string(), + rrdp_notification_uri: None, + parent_manifest_rsync_uri: None, + depth: 1, + }, + discovered_from: crate::audit::DiscoveredFrom { + parent_manifest_rsync_uri: "rsync://example.test/repo/example.mft".to_string(), + child_ca_certificate_rsync_uri: "rsync://example.test/repo/child.cer" + .to_string(), + child_ca_certificate_sha256_hex: "55".repeat(32), + }, + }); + let output = FreshPublicationPointFinalizeOutput { + result, + snapshot_pack_ms: 1, + persist_vcir_ms: 2, + persist_vcir_timing: PersistVcirTimingBreakdown { + build_vcir_ms: 3, + replace_vcir_ms: 4, + build_vcir: BuildVcirTimingBreakdown { + related_artifacts_ms: 5, + ..BuildVcirTimingBreakdown::default() + }, + ..PersistVcirTimingBreakdown::default() + }, + ccr_projection_build_ms: 6, + ccr_append_ms: 7, + audit_build_ms: 8, + }; + + let metrics = finalize_metrics_from_output(&output, 9, 10, Some(11), 12, 13); + + assert_eq!(metrics.snapshot_pack_ms, 1); + assert_eq!(metrics.persist_vcir_ms, 2); + assert_eq!(metrics.persist_build_vcir_ms, 3); + assert_eq!(metrics.persist_replace_vcir_ms, 4); + assert_eq!( + metrics.persist_replace_breakdown, + crate::storage::VcirReplaceTimingBreakdown::default() + ); + assert_eq!(metrics.ccr_projection_build_ms, 6); + assert_eq!(metrics.ccr_append_ms, 7); + assert_eq!(metrics.audit_build_ms, 8); + assert_eq!(metrics.reduce_ms, 9); + assert_eq!(metrics.finalize_ms, 10); + assert_eq!(metrics.finalize_queue_wait_ms, Some(11)); + assert_eq!(metrics.finalize_worker_ms, 12); + assert_eq!(metrics.locked_files, 13); + assert_eq!(metrics.child_count, 1); + assert_eq!(metrics.vrp_count, 1); + assert_eq!(metrics.vap_count, 1); + assert_eq!(metrics.audit_object_count, 1); + } } diff --git a/src/validation/tree_runner.rs b/src/validation/tree_runner.rs index 0c36782..a5ca9bc 100644 --- a/src/validation/tree_runner.rs +++ b/src/validation/tree_runner.rs @@ -113,6 +113,8 @@ pub(crate) struct FreshPublicationPointFinalizeOutput { pub(crate) snapshot_pack_ms: u64, pub(crate) persist_vcir_ms: u64, pub(crate) persist_vcir_timing: PersistVcirTimingBreakdown, + pub(crate) ccr_projection_build_ms: u64, + pub(crate) ccr_append_ms: u64, pub(crate) audit_build_ms: u64, } @@ -163,20 +165,25 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { return None; } let load_started = std::time::Instant::now(); - let loaded_vcir = self.store.get_vcir(manifest_rsync_uri); + let loaded_projection = self.store.get_roa_cache_projection(manifest_rsync_uri); if let Some(timing) = self.timing.as_ref() { timing.record_phase_nanos( - "roa_validation_cache_vcir_load_total", + "roa_validation_cache_projection_load_total", load_started.elapsed().as_nanos().min(u128::from(u64::MAX)) as u64, ); } - match loaded_vcir { - Ok(Some(vcir)) => { + match loaded_projection { + Ok(Some(projection)) => { + if let Some(timing) = self.timing.as_ref() { + timing + .record_count("roa_validation_cache_projection_hit_publication_points", 1); + } let view_started = std::time::Instant::now(); - let view = RoaValidationCacheView::from_vcir(&vcir, self.validation_time); + let view = + RoaValidationCacheView::from_projection(&projection, self.validation_time); if let Some(timing) = self.timing.as_ref() { timing.record_phase_nanos( - "roa_validation_cache_view_build_total", + "roa_validation_cache_projection_build_total", view_started.elapsed().as_nanos().min(u128::from(u64::MAX)) as u64, ); } @@ -184,16 +191,19 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { } Ok(None) => { if let Some(timing) = self.timing.as_ref() { - timing.record_count("roa_validation_cache_vcir_missing_publication_points", 1); + timing.record_count( + "roa_validation_cache_projection_missing_publication_points", + 1, + ); } None } Err(err) => { if let Some(timing) = self.timing.as_ref() { - timing.record_count("roa_validation_cache_vcir_load_errors", 1); + timing.record_count("roa_validation_cache_projection_load_errors", 1); } crate::progress_log::emit( - "roa_validation_cache_vcir_load_error", + "roa_validation_cache_projection_load_error", serde_json::json!({ "manifest_rsync_uri": manifest_rsync_uri, "error": err.to_string(), @@ -381,12 +391,18 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { // publication point result is retained for the rest of the run. let _released_local_outputs = std::mem::take(&mut objects.local_outputs_cache); + let mut ccr_projection_build_ms = 0; + let mut ccr_append_ms = 0; if self.ccr_accumulator.is_some() { + let ccr_projection_build_started = std::time::Instant::now(); let child_entries = build_vcir_child_entries(&discovered_children, self.validation_time)?; let ccr_manifest_projection = build_vcir_ccr_manifest_projection_from_fresh(ca, &pack, &child_entries)?; + ccr_projection_build_ms = ccr_projection_build_started.elapsed().as_millis() as u64; + let ccr_append_started = std::time::Instant::now(); self.append_ccr_manifest_projection(&ccr_manifest_projection)?; + ccr_append_ms = ccr_append_started.elapsed().as_millis() as u64; } let audit_build_started = std::time::Instant::now(); @@ -416,6 +432,8 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { snapshot_pack_ms, persist_vcir_ms, persist_vcir_timing, + ccr_projection_build_ms, + ccr_append_ms, audit_build_ms, }) } @@ -805,6 +823,8 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> { snapshot_pack_ms, persist_vcir_ms, persist_vcir_timing, + ccr_projection_build_ms, + ccr_append_ms, audit_build_ms, } = finalized; let total_duration_ms = publication_point_started.elapsed().as_millis() as u64; @@ -841,6 +861,8 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> { "persist_related_artifacts_ms": persist_vcir_timing.build_vcir.related_artifacts_ms, "persist_vcir_struct_ms": persist_vcir_timing.build_vcir.struct_build_ms, "persist_replace_breakdown": &persist_vcir_timing.replace_vcir, + "ccr_projection_build_ms": ccr_projection_build_ms, + "ccr_append_ms": ccr_append_ms, "audit_build_ms": audit_build_ms, "warning_count": result.warnings.len(), "vrp_count": result.objects.vrps.len(), @@ -884,6 +906,8 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> { "persist_related_artifacts_ms": persist_vcir_timing.build_vcir.related_artifacts_ms, "persist_vcir_struct_ms": persist_vcir_timing.build_vcir.struct_build_ms, "persist_replace_breakdown": &persist_vcir_timing.replace_vcir, + "ccr_projection_build_ms": ccr_projection_build_ms, + "ccr_append_ms": ccr_append_ms, "audit_build_ms": audit_build_ms, }), ); diff --git a/src/validation/tree_runner/tests.rs b/src/validation/tree_runner/tests.rs index 8d9c32d..f015373 100644 --- a/src/validation/tree_runner/tests.rs +++ b/src/validation/tree_runner/tests.rs @@ -3034,6 +3034,85 @@ fn parse_snapshot_time_value_reports_invalid_timestamp() { assert!(err.contains("invalid RFC3339 time 'not-a-time'"), "{err}"); } +#[test] +fn runner_roa_validation_cache_uses_projection_not_full_vcir_fallback() { + let now = time::OffsetDateTime::now_utc(); + let child_cert_hash = sha256_hex(b"child-cert"); + let mut vcir = sample_vcir_for_projection(now, &child_cert_hash); + vcir.local_outputs + .retain(|output| output.source_object_type != VcirSourceObjectType::Roa); + vcir.summary.local_vrp_count = 0; + vcir.summary.local_aspa_count = 1; + vcir.summary.local_router_key_count = 1; + + let store_dir = tempfile::tempdir().expect("store dir"); + let store = RocksStore::open(store_dir.path()).expect("open rocksdb"); + store.put_vcir(&vcir).expect("put vcir without projection"); + assert!( + store + .get_vcir(&vcir.manifest_rsync_uri) + .expect("get vcir") + .is_some() + ); + assert!( + store + .get_roa_cache_projection(&vcir.manifest_rsync_uri) + .expect("get projection") + .is_none() + ); + + let timing = crate::analysis::timing::TimingHandle::new(crate::analysis::timing::TimingMeta { + recorded_at_utc_rfc3339: "2026-06-07T00:00:00Z".to_string(), + validation_time_utc_rfc3339: "2026-06-07T00:00:00Z".to_string(), + tal_url: None, + db_path: None, + }); + let policy = Policy::default(); + let runner = Rpkiv1PublicationPointRunner { + store: &store, + policy: &policy, + http_fetcher: &NeverHttpFetcher, + rsync_fetcher: &FailingRsyncFetcher, + validation_time: now, + timing: Some(timing.clone()), + download_log: None, + replay_archive_index: None, + replay_delta_index: None, + rrdp_dedup: false, + rrdp_repo_cache: Mutex::new(HashMap::new()), + rsync_dedup: false, + rsync_repo_cache: Mutex::new(HashMap::new()), + current_repo_index: None, + repo_sync_runtime: None, + parallel_phase2_config: None, + parallel_roa_worker_pool: None, + ccr_accumulator: None, + persist_vcir: true, + enable_roa_validation_cache: true, + }; + + assert!( + runner + .roa_validation_cache_view_for_fresh_point(&vcir.manifest_rsync_uri) + .is_none() + ); + let dir = tempfile::tempdir().expect("timing dir"); + let path = dir.path().join("timing.json"); + timing.write_json(&path, 10).expect("write timing"); + let report: serde_json::Value = + serde_json::from_slice(&std::fs::read(path).expect("read timing")).expect("parse timing"); + assert_eq!( + report["counts"]["roa_validation_cache_projection_missing_publication_points"], + 1 + ); + assert!( + report["phases"]["roa_validation_cache_projection_load_total"]["count"] + .as_u64() + .unwrap_or_default() + >= 1 + ); +} + #[test] fn build_objects_output_from_vcir_tracks_expired_and_invalid_cached_outputs() { let now = time::OffsetDateTime::now_utc(); diff --git a/tests/test_apnic_stats_live_stage2.rs b/tests/test_apnic_stats_live_stage2.rs index c7a3fab..f11affd 100644 --- a/tests/test_apnic_stats_live_stage2.rs +++ b/tests/test_apnic_stats_live_stage2.rs @@ -219,6 +219,7 @@ fn apnic_tree_full_stats_serial() { persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, }, ) .expect("run tree"); diff --git a/tests/test_apnic_tree_live_m15.rs b/tests/test_apnic_tree_live_m15.rs index 609b5d2..8205cb9 100644 --- a/tests/test_apnic_tree_live_m15.rs +++ b/tests/test_apnic_tree_live_m15.rs @@ -40,6 +40,7 @@ fn apnic_tree_depth1_processes_more_than_root() { persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, }, ) .expect("run tree from tal"); @@ -82,6 +83,7 @@ fn apnic_tree_root_only_processes_root_with_long_timeouts() { persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, }, ) .expect("run APNIC root-only"); diff --git a/tests/test_deterministic_semantics_m4.rs b/tests/test_deterministic_semantics_m4.rs index 6019778..b814315 100644 --- a/tests/test_deterministic_semantics_m4.rs +++ b/tests/test_deterministic_semantics_m4.rs @@ -112,6 +112,7 @@ fn crl_mismatch_drops_publication_point_and_cites_rfc_sections() { persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, }, ) .expect("run tree audit"); diff --git a/tests/test_run_tree_from_tal_offline_m17.rs b/tests/test_run_tree_from_tal_offline_m17.rs index 77ff7be..f82cd0a 100644 --- a/tests/test_run_tree_from_tal_offline_m17.rs +++ b/tests/test_run_tree_from_tal_offline_m17.rs @@ -118,6 +118,7 @@ fn run_tree_from_tal_url_entry_executes_and_records_failure_when_repo_empty() { persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, }, ) .expect("run tree"); @@ -166,6 +167,7 @@ fn run_tree_from_tal_and_ta_der_entry_executes_and_records_failure_when_repo_emp persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, }, ) .expect("run tree"); @@ -222,6 +224,7 @@ fn run_tree_from_tal_url_audit_entry_collects_no_publication_points_when_repo_em persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, }, ) .expect("run tree audit"); @@ -266,6 +269,7 @@ fn run_tree_from_tal_and_ta_der_audit_entry_collects_no_publication_points_when_ persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, }, ) .expect("run tree audit"); @@ -318,6 +322,7 @@ fn run_tree_from_tal_url_audit_with_timing_records_phases_when_repo_empty() { persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, }, &timing, ) @@ -369,6 +374,7 @@ fn run_tree_from_tal_and_ta_der_audit_with_timing_records_phases_when_repo_empty persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, }, &timing, ) diff --git a/tests/test_tree_traversal_m14.rs b/tests/test_tree_traversal_m14.rs index 4fd324d..a19f32a 100644 --- a/tests/test_tree_traversal_m14.rs +++ b/tests/test_tree_traversal_m14.rs @@ -296,6 +296,7 @@ fn tree_respects_max_depth_and_max_instances() { persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, }, ) .expect("run tree depth-limited"); @@ -312,6 +313,7 @@ fn tree_respects_max_depth_and_max_instances() { persist_vcir: true, build_ccr_accumulator: true, enable_roa_validation_cache: false, + enable_transport_request_prefetch: false, }, ) .expect("run tree instance-limited");