diff --git a/src/validation/tree_runner.rs b/src/validation/tree_runner.rs index a3c4338..f145d97 100644 --- a/src/validation/tree_runner.rs +++ b/src/validation/tree_runner.rs @@ -25,11 +25,11 @@ use crate::replay::archive::ReplayArchiveIndex; use crate::replay::delta_archive::ReplayDeltaArchiveIndex; use crate::report::{RfcRef, Warning}; use crate::storage::{ - PackFile, PackTime, PublicationPointCacheProjection, RawByHashEntry, RocksStore, - ValidatedCaInstanceResult, VcirArtifactKind, VcirArtifactRole, VcirArtifactValidationStatus, - VcirAuditSummary, VcirCcrManifestProjection, VcirChildEntry, VcirInstanceGate, VcirLocalOutput, - VcirLocalOutputPayload, VcirOutputType, VcirRelatedArtifact, VcirReplaceTimingBreakdown, - VcirSourceObjectType, VcirSummary, + PackFile, PackTime, PublicationPointCacheChild, PublicationPointCacheProjection, + RawByHashEntry, RocksStore, ValidatedCaInstanceResult, VcirArtifactKind, VcirArtifactRole, + VcirArtifactValidationStatus, VcirAuditSummary, VcirCcrManifestProjection, VcirChildEntry, + VcirInstanceGate, VcirLocalOutput, VcirLocalOutputPayload, VcirOutputType, VcirRelatedArtifact, + VcirReplaceTimingBreakdown, VcirSourceObjectType, VcirSummary, }; use crate::sync::repo::{ sync_publication_point, sync_publication_point_replay, sync_publication_point_replay_delta, @@ -64,6 +64,9 @@ use x509_parser::x509::SubjectPublicKeyInfo; use vcir_der::encode_access_description_der_for_vcir_ccr_projection; +const PUBLICATION_POINT_CACHE_CHILD_RESTORE_PARALLEL_MIN_CHILDREN: usize = 256; +const PUBLICATION_POINT_CACHE_CHILD_RESTORE_MAX_WORKERS: usize = 16; + fn sha256_hex_to_32(hex_value: &str) -> [u8; 32] { let mut out = [0u8; 32]; hex::decode_to_slice(hex_value, &mut out).expect("internal sha256 hex should decode"); @@ -538,12 +541,15 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { build_objects_started, ); let restore_children_started = std::time::Instant::now(); + let child_restore_workers = self.publication_point_cache_child_restore_worker_count(); let (discovered_children, child_audits) = restore_children_from_publication_point_cache( self.store, ca, &projection, self.validation_time, &mut warnings, + child_restore_workers, + self.timing.as_ref(), ); let restore_children_ms = self.record_publication_point_cache_phase_ms( "publication_point_cache_restore_children_total", @@ -611,6 +617,7 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { "to_vcir_ms": to_vcir_ms, "build_objects_ms": build_objects_ms, "restore_children_ms": restore_children_ms, + "restore_children_workers": child_restore_workers, "ccr_append_ms": ccr_append_ms, "audit_build_ms": audit_build_ms, "total_ms": total_ms, @@ -642,6 +649,14 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { nanos / 1_000_000 } + fn publication_point_cache_child_restore_worker_count(&self) -> usize { + self.parallel_phase2_config + .as_ref() + .map(|config| config.object_workers) + .unwrap_or(1) + .clamp(1, PUBLICATION_POINT_CACHE_CHILD_RESTORE_MAX_WORKERS) + } + pub(crate) fn ccr_accumulator_snapshot(&self) -> Option { self.ccr_accumulator .as_ref() @@ -3282,119 +3297,277 @@ fn restore_children_from_publication_point_cache( projection: &PublicationPointCacheProjection, validation_time: time::OffsetDateTime, warnings: &mut Vec, + worker_count: usize, + timing: Option<&TimingHandle>, ) -> (Vec, Vec) { - let mut children = Vec::new(); - let mut audits = Vec::new(); - for child in &projection.children { + let worker_count = worker_count + .clamp(1, PUBLICATION_POINT_CACHE_CHILD_RESTORE_MAX_WORKERS) + .min(projection.children.len().max(1)); + let outcomes = if worker_count > 1 + && projection.children.len() >= PUBLICATION_POINT_CACHE_CHILD_RESTORE_PARALLEL_MIN_CHILDREN + { + if let Some(timing) = timing { + timing.record_count( + "publication_point_cache_restore_children_parallel_publication_points", + 1, + ); + timing.record_count( + "publication_point_cache_restore_children_parallel_children", + projection.children.len() as u64, + ); + timing.record_count( + "publication_point_cache_restore_children_workers_total", + worker_count as u64, + ); + } + restore_publication_point_cache_children_parallel( + store, + ca, + &projection.children, + validation_time, + worker_count, + ) + } else { + if let Some(timing) = timing { + timing.record_count( + "publication_point_cache_restore_children_batch_publication_points", + 1, + ); + timing.record_count( + "publication_point_cache_restore_children_batch_children", + projection.children.len() as u64, + ); + } + restore_publication_point_cache_children_chunk( + store, + ca, + &projection.children, + validation_time, + ) + }; + collect_publication_point_cache_child_restore_outcomes(outcomes, warnings) +} + +fn restore_publication_point_cache_children_parallel( + store: &RocksStore, + ca: &CaInstanceHandle, + children: &[PublicationPointCacheChild], + validation_time: time::OffsetDateTime, + worker_count: usize, +) -> Vec { + let chunk_size = children.len().div_ceil(worker_count).max(1); + let mut chunk_results = Vec::new(); + std::thread::scope(|scope| { + let mut handles = Vec::new(); + for chunk in children.chunks(chunk_size) { + handles.push(scope.spawn(move || { + restore_publication_point_cache_children_chunk(store, ca, chunk, validation_time) + })); + } + for handle in handles { + chunk_results.extend( + handle + .join() + .expect("publication-point cache child restore worker panicked"), + ); + } + }); + chunk_results +} + +fn restore_publication_point_cache_children_chunk( + store: &RocksStore, + ca: &CaInstanceHandle, + children: &[PublicationPointCacheChild], + validation_time: time::OffsetDateTime, +) -> Vec { + let mut outcomes = vec![None; children.len()]; + let mut valid_positions = Vec::new(); + let mut hashes = Vec::new(); + for (position, child) in children.iter().enumerate() { let effective_not_before = match parse_snapshot_time_value(&child.child_effective_not_before) { Ok(value) => value, Err(e) => { - warnings.push( - Warning::new(format!( - "publication-point cache child has invalid effective notBefore: {e}" - )) - .with_context(&child.child_cert_rsync_uri), - ); + outcomes[position] = Some(PublicationPointCacheChildRestoreOutcome { + child: None, + audit: None, + warning: Some( + Warning::new(format!( + "publication-point cache child has invalid effective notBefore: {e}" + )) + .with_context(&child.child_cert_rsync_uri), + ), + }); continue; } }; let effective_until = match parse_snapshot_time_value(&child.child_effective_until) { Ok(value) => value, Err(e) => { - warnings.push( - Warning::new(format!( - "publication-point cache child has invalid effective until: {e}" - )) - .with_context(&child.child_cert_rsync_uri), - ); + outcomes[position] = Some(PublicationPointCacheChildRestoreOutcome { + child: None, + audit: None, + warning: Some( + Warning::new(format!( + "publication-point cache child has invalid effective until: {e}" + )) + .with_context(&child.child_cert_rsync_uri), + ), + }); continue; } }; if validation_time < effective_not_before || validation_time > effective_until { - audits.push(ObjectAuditEntry { - rsync_uri: child.child_cert_rsync_uri.clone(), - sha256_hex: child.child_cert_hash.clone(), - kind: AuditObjectKind::Certificate, - result: AuditObjectResult::Skipped, - detail: Some("skipped: publication-point cache child expired".to_string()), + outcomes[position] = Some(PublicationPointCacheChildRestoreOutcome { + child: None, + warning: None, + audit: Some(publication_point_cache_child_audit( + child, + AuditObjectResult::Skipped, + Some("skipped: publication-point cache child expired".to_string()), + )), }); continue; } - match store.get_blob_bytes(&child.child_cert_hash) { - Ok(Some(bytes)) => { - children.push(DiscoveredChildCaInstance { - handle: CaInstanceHandle { - depth: 0, - tal_id: ca.tal_id.clone(), - parent_manifest_rsync_uri: Some(ca.manifest_rsync_uri.clone()), - ca_certificate_der: bytes, - ca_certificate_rsync_uri: Some(child.child_cert_rsync_uri.clone()), - effective_ip_resources: child.child_effective_ip_resources.clone(), - effective_as_resources: child.child_effective_as_resources.clone(), - rsync_base_uri: child.child_rsync_base_uri.clone(), - manifest_rsync_uri: child.child_manifest_rsync_uri.clone(), - publication_point_rsync_uri: child - .child_publication_point_rsync_uri - .clone(), - rrdp_notification_uri: child.child_rrdp_notification_uri.clone(), + valid_positions.push(position); + hashes.push(child.child_cert_hash.clone()); + } + + match store.get_blob_bytes_batch(&hashes) { + Ok(bytes_by_position) => { + for (position, maybe_bytes) in valid_positions.into_iter().zip(bytes_by_position) { + let child = &children[position]; + outcomes[position] = Some(match maybe_bytes { + Some(bytes) => PublicationPointCacheChildRestoreOutcome { + child: Some(publication_point_cache_discovered_child(ca, child, bytes)), + warning: None, + audit: Some(publication_point_cache_child_audit( + child, + AuditObjectResult::Ok, + Some( + "restored child CA instance from publication-point cache" + .to_string(), + ), + )), }, - discovered_from: crate::audit::DiscoveredFrom { - parent_manifest_rsync_uri: ca.manifest_rsync_uri.clone(), - child_ca_certificate_rsync_uri: child.child_cert_rsync_uri.clone(), - child_ca_certificate_sha256_hex: child.child_cert_hash.clone(), + None => PublicationPointCacheChildRestoreOutcome { + child: None, + warning: Some( + Warning::new( + "child certificate bytes missing for publication-point cache restoration", + ) + .with_context(&child.child_cert_rsync_uri), + ), + audit: Some(publication_point_cache_child_audit( + child, + AuditObjectResult::Error, + Some( + "child certificate bytes missing for publication-point cache restoration" + .to_string(), + ), + )), }, }); - audits.push(ObjectAuditEntry { - rsync_uri: child.child_cert_rsync_uri.clone(), - sha256_hex: child.child_cert_hash.clone(), - kind: AuditObjectKind::Certificate, - result: AuditObjectResult::Ok, - detail: Some( - "restored child CA instance from publication-point cache".to_string(), - ), - }); } - Ok(None) => { - warnings.push( - Warning::new( - "child certificate bytes missing for publication-point cache restoration", - ) - .with_context(&child.child_cert_rsync_uri), - ); - audits.push(ObjectAuditEntry { - rsync_uri: child.child_cert_rsync_uri.clone(), - sha256_hex: child.child_cert_hash.clone(), - kind: AuditObjectKind::Certificate, - result: AuditObjectResult::Error, - detail: Some( - "child certificate bytes missing for publication-point cache restoration" - .to_string(), + } + Err(e) => { + for position in valid_positions { + let child = &children[position]; + outcomes[position] = Some(PublicationPointCacheChildRestoreOutcome { + child: None, + warning: Some( + Warning::new(format!( + "child certificate bytes load failed for publication-point cache restoration: {e}" + )) + .with_context(&child.child_cert_rsync_uri), ), - }); - } - Err(e) => { - warnings.push( - Warning::new(format!( - "child certificate bytes load failed for publication-point cache restoration: {e}" - )) - .with_context(&child.child_cert_rsync_uri), - ); - audits.push(ObjectAuditEntry { - rsync_uri: child.child_cert_rsync_uri.clone(), - sha256_hex: child.child_cert_hash.clone(), - kind: AuditObjectKind::Certificate, - result: AuditObjectResult::Error, - detail: Some(format!( - "child certificate bytes load failed for publication-point cache restoration: {e}" + audit: Some(publication_point_cache_child_audit( + child, + AuditObjectResult::Error, + Some(format!( + "child certificate bytes load failed for publication-point cache restoration: {e}" + )), )), }); } } } + + outcomes + .into_iter() + .flatten() + .collect::>() +} + +fn collect_publication_point_cache_child_restore_outcomes( + outcomes: Vec, + warnings: &mut Vec, +) -> (Vec, Vec) { + let mut children = Vec::new(); + let mut audits = Vec::new(); + for outcome in outcomes { + if let Some(warning) = outcome.warning { + warnings.push(warning); + } + if let Some(audit) = outcome.audit { + audits.push(audit); + } + if let Some(child) = outcome.child { + children.push(child); + } + } (children, audits) } +#[derive(Clone)] +struct PublicationPointCacheChildRestoreOutcome { + child: Option, + audit: Option, + warning: Option, +} + +fn publication_point_cache_discovered_child( + ca: &CaInstanceHandle, + child: &PublicationPointCacheChild, + bytes: Vec, +) -> DiscoveredChildCaInstance { + DiscoveredChildCaInstance { + handle: CaInstanceHandle { + depth: 0, + tal_id: ca.tal_id.clone(), + parent_manifest_rsync_uri: Some(ca.manifest_rsync_uri.clone()), + ca_certificate_der: bytes, + ca_certificate_rsync_uri: Some(child.child_cert_rsync_uri.clone()), + effective_ip_resources: child.child_effective_ip_resources.clone(), + effective_as_resources: child.child_effective_as_resources.clone(), + rsync_base_uri: child.child_rsync_base_uri.clone(), + manifest_rsync_uri: child.child_manifest_rsync_uri.clone(), + publication_point_rsync_uri: child.child_publication_point_rsync_uri.clone(), + rrdp_notification_uri: child.child_rrdp_notification_uri.clone(), + }, + discovered_from: crate::audit::DiscoveredFrom { + parent_manifest_rsync_uri: ca.manifest_rsync_uri.clone(), + child_ca_certificate_rsync_uri: child.child_cert_rsync_uri.clone(), + child_ca_certificate_sha256_hex: child.child_cert_hash.clone(), + }, + } +} + +fn publication_point_cache_child_audit( + child: &PublicationPointCacheChild, + result: AuditObjectResult, + detail: Option, +) -> ObjectAuditEntry { + ObjectAuditEntry { + rsync_uri: child.child_cert_rsync_uri.clone(), + sha256_hex: child.child_cert_hash.clone(), + kind: AuditObjectKind::Certificate, + result, + detail, + } +} + fn persist_vcir_for_fresh_result_with_timing( store: &RocksStore, policy: &Policy, diff --git a/src/validation/tree_runner/tests.rs b/src/validation/tree_runner/tests.rs index 88688cf..27a5609 100644 --- a/src/validation/tree_runner/tests.rs +++ b/src/validation/tree_runner/tests.rs @@ -2037,6 +2037,96 @@ fn runner_publication_point_cache_reuses_projection_outputs_children_and_ccr() { assert!(phase_keys.contains("publication_point_cache_build_objects_total")); } +#[test] +fn publication_point_cache_restore_children_parallel_keeps_order_and_audit() { + let store_dir = tempfile::tempdir().expect("store dir"); + let store = RocksStore::open(store_dir.path()).expect("open rocksdb"); + let policy = Policy::default(); + let validation_time = time::OffsetDateTime::UNIX_EPOCH + time::Duration::minutes(1); + let ca = publication_point_cache_fixture_ca(); + seed_publication_point_cache_projection(&store, &policy, &ca, validation_time); + let mut projection = store + .get_publication_point_cache_projection(&ca.manifest_rsync_uri) + .expect("load projection") + .expect("projection"); + let template = projection.children[0].clone(); + let mut blobs = Vec::new(); + let mut children = Vec::new(); + for index in 0..300 { + let bytes = format!("child-cert-{index}").into_bytes(); + let child_hash = sha256_hex(&bytes); + let mut child = template.clone(); + child.child_cert_hash = child_hash.clone(); + child.child_cert_rsync_uri = format!("rsync://example.test/repo/issuer/child-{index}.cer"); + child.child_manifest_rsync_uri = + format!("rsync://example.test/repo/child-{index}/child.mft"); + child.child_publication_point_rsync_uri = + format!("rsync://example.test/repo/child-{index}/"); + child.child_rsync_base_uri = child.child_publication_point_rsync_uri.clone(); + blobs.push((child_hash, bytes)); + children.push(child); + } + projection.children = children; + store.put_blob_bytes_batch(&blobs).expect("put child blobs"); + let timing = crate::analysis::timing::TimingHandle::new(crate::analysis::timing::TimingMeta { + recorded_at_utc_rfc3339: "2026-01-01T00:00:00Z".to_string(), + validation_time_utc_rfc3339: "2026-01-01T00:00:00Z".to_string(), + tal_url: None, + db_path: None, + }); + let mut warnings = Vec::new(); + + let (restored_children, audits) = restore_children_from_publication_point_cache( + &store, + &ca, + &projection, + validation_time, + &mut warnings, + 4, + Some(&timing), + ); + + assert!(warnings.is_empty()); + assert_eq!(restored_children.len(), 300); + assert_eq!(audits.len(), 300); + assert_eq!( + restored_children[0].handle.ca_certificate_der, + b"child-cert-0" + ); + assert_eq!( + restored_children[299].handle.ca_certificate_der, + b"child-cert-299" + ); + assert_eq!( + restored_children[0].handle.manifest_rsync_uri, + "rsync://example.test/repo/child-0/child.mft" + ); + assert!( + audits + .iter() + .all(|audit| audit.result == AuditObjectResult::Ok) + ); + let counts = timing.counts_snapshot(); + assert_eq!( + counts + .get("publication_point_cache_restore_children_parallel_publication_points") + .copied(), + Some(1) + ); + assert_eq!( + counts + .get("publication_point_cache_restore_children_parallel_children") + .copied(), + Some(300) + ); + assert_eq!( + counts + .get("publication_point_cache_restore_children_workers_total") + .copied(), + Some(4) + ); +} + #[test] fn runner_publication_point_cache_blocks_parent_policy_and_output_time_mismatch() { let store_dir = tempfile::tempdir().expect("store dir");