use std::collections::{HashMap, HashSet, VecDeque}; use std::time::{Duration, Instant}; use crate::audit::{DiscoveredFrom, PublicationPointAudit}; use crate::parallel::object_worker::ObjectWorkerSubmitError; use crate::parallel::repo_runtime::{RepoSyncRequestStatus, RepoSyncRuntimeOutcome}; use crate::parallel::types::RepoIdentity; use crate::policy::SignedObjectFailurePolicy; use crate::report::Warning; use crate::validation::objects::{ prepare_publication_point_for_parallel_roa, reduce_parallel_roa_stage, ObjectsOutput, OwnedRoaTask, ParallelObjectsPrepare, ParallelObjectsStage, }; use crate::validation::tree::{ run_tree_serial_audit_multi_root, CaInstanceHandle, DiscoveredChildCaInstance, PublicationPointRunResult, PublicationPointRunner, TreeRunAuditOutput, TreeRunConfig, TreeRunError, TreeRunOutput, }; use crate::validation::tree_runner::{FreshPublicationPointStage, Rpkiv1PublicationPointRunner}; #[derive(Clone, Debug)] struct QueuedCaInstance { id: u64, handle: CaInstanceHandle, parent_id: Option, discovered_from: Option, } #[derive(Clone, Debug)] struct ReadyCaInstance { node: QueuedCaInstance, repo_outcome: RepoSyncRuntimeOutcome, } struct InflightPublicationPoint { node: QueuedCaInstance, fresh_stage: FreshPublicationPointStage, objects_stage: ParallelObjectsStage, repo_outcome: RepoSyncRuntimeOutcome, warnings: Vec, started_at: Instant, objects_started_at: Instant, task_count: usize, results: Vec, } struct FinishedPublicationPoint { node: QueuedCaInstance, result: Result, } fn compact_phase2_finished_result( mut result: PublicationPointRunResult, ) -> PublicationPointRunResult { // Phase2 only needs warnings, objects, audit, and traversal metadata after finalize. // Dropping the snapshot here avoids retaining manifest/files/raw-byte caches until run end. result.snapshot = None; result } fn compact_phase2_finished_result_result( result: Result, ) -> Result { result.map(compact_phase2_finished_result) } pub fn run_tree_parallel_phase2_audit_multi_root( roots: Vec, runner: &Rpkiv1PublicationPointRunner<'_>, config: &TreeRunConfig, ) -> Result { if runner.policy.signed_object_failure_policy == SignedObjectFailurePolicy::DropPublicationPoint { return run_tree_serial_audit_multi_root(roots, runner, config); } let Some(repo_runtime) = runner.repo_sync_runtime.as_ref() else { return run_tree_serial_audit_multi_root(roots, runner, config); }; if runner.parallel_roa_worker_pool.is_none() { return run_tree_serial_audit_multi_root(roots, runner, config); } let mut next_id: u64 = 0; let mut ca_queue: VecDeque = VecDeque::new(); for root in roots { ca_queue.push_back(QueuedCaInstance { id: next_id, handle: root, parent_id: None, discovered_from: None, }); next_id += 1; } let mut visited_manifest_uris: HashSet = HashSet::new(); let mut ca_waiting_repo_by_identity: HashMap> = HashMap::new(); let mut ready_queue: VecDeque = VecDeque::new(); let mut inflight_publication_points: HashMap = HashMap::new(); let mut pending_roa_dispatch: VecDeque = VecDeque::new(); let mut finished: Vec = Vec::new(); let mut instances_started = 0usize; loop { while can_start_more(instances_started, config) { let Some(node) = ca_queue.pop_front() else { break; }; if !visited_manifest_uris.insert(node.handle.manifest_rsync_uri.clone()) { continue; } if let Some(max_depth) = config.max_depth { if node.handle.depth > max_depth { continue; } } instances_started += 1; match repo_runtime.request_publication_point_repo(&node.handle, 0) { Ok(RepoSyncRequestStatus::Ready { mut outcome, .. }) => { // Ready here means this CA is reusing repo work that has already completed // (often due to child prefetch). Do not add the transport duration again. outcome.repo_sync_duration_ms = 0; ready_queue.push_back(ReadyCaInstance { node, repo_outcome: outcome, }); } Ok(RepoSyncRequestStatus::Pending { identity, .. }) => { ca_waiting_repo_by_identity .entry(identity) .or_default() .push(node); } Err(err) => { finished.push(FinishedPublicationPoint { node, result: Err(err), }); } } } while let Some(ready) = ready_queue.pop_front() { stage_ready_publication_point( runner, &mut next_id, &mut ca_queue, &mut pending_roa_dispatch, &mut inflight_publication_points, &mut finished, ready, ); } flush_pending_roa_dispatch(runner, &mut pending_roa_dispatch)?; drain_object_results(runner, &mut inflight_publication_points, &mut finished)?; let repo_poll_timeout = event_poll_timeout( &ca_queue, &ready_queue, &pending_roa_dispatch, &inflight_publication_points, instances_started, config, ); drain_repo_events( repo_runtime.as_ref(), &mut ca_waiting_repo_by_identity, &mut ready_queue, repo_poll_timeout, )?; if is_complete( &ca_queue, &ready_queue, &ca_waiting_repo_by_identity, &pending_roa_dispatch, &inflight_publication_points, instances_started, config, ) { break; } } repo_runtime .reset_run_state() .map_err(TreeRunError::Runner)?; Ok(build_tree_output(finished)) } fn can_start_more(instances_started: usize, config: &TreeRunConfig) -> bool { config .max_instances .map(|max| instances_started < max) .unwrap_or(true) } fn stage_ready_publication_point( runner: &Rpkiv1PublicationPointRunner<'_>, next_id: &mut u64, ca_queue: &mut VecDeque, pending_roa_dispatch: &mut VecDeque, inflight_publication_points: &mut HashMap, finished: &mut Vec, ready: ReadyCaInstance, ) { let publication_point_started = Instant::now(); let mut warnings = ready.repo_outcome.warnings.clone(); let repo_outcome = ready.repo_outcome.clone(); let stage = runner.stage_fresh_publication_point_after_repo_ready( &ready.node.handle, repo_outcome.repo_sync_ok, repo_outcome.repo_sync_err.as_deref(), ); let fresh_stage = match stage { Ok(stage) => stage, Err(_) => { let fallback = runner.run_publication_point(&ready.node.handle); if let Ok(result) = fallback.as_ref() { enqueue_discovered_children( runner, next_id, ca_queue, &ready.node, result.discovered_children.clone(), ); } finished.push(FinishedPublicationPoint { node: ready.node, result: compact_phase2_finished_result_result(fallback), }); return; } }; warnings.extend(fresh_stage.warnings.clone()); enqueue_discovered_children( runner, next_id, ca_queue, &ready.node, fresh_stage.discovered_children.clone(), ); match prepare_publication_point_for_parallel_roa( ready.node.id, &fresh_stage.fresh_point, &ready.node.handle.ca_certificate_der, ready.node.handle.ca_certificate_rsync_uri.as_deref(), ready.node.handle.effective_ip_resources.as_ref(), ready.node.handle.effective_as_resources.as_ref(), runner.validation_time, ) { ParallelObjectsPrepare::Complete(mut objects) => { objects .router_keys .extend(fresh_stage.discovered_router_keys.clone()); objects.local_outputs_cache.extend( crate::validation::tree_runner::build_router_key_local_outputs( &ready.node.handle, &objects.router_keys, ), ); finalize_ready_objects( runner, ready.node, fresh_stage, warnings, objects, repo_outcome, finished, ); } ParallelObjectsPrepare::Staged(objects_stage) => { let tasks = objects_stage.build_roa_tasks(); let task_count = objects_stage.roa_task_count(); for task in tasks { pending_roa_dispatch.push_back(task); } if task_count == 0 { match reduce_parallel_roa_stage(objects_stage, Vec::new(), runner.timing.as_ref()) { Ok(mut objects) => { objects .router_keys .extend(fresh_stage.discovered_router_keys.clone()); objects.local_outputs_cache.extend( crate::validation::tree_runner::build_router_key_local_outputs( &ready.node.handle, &objects.router_keys, ), ); finalize_ready_objects( runner, ready.node, fresh_stage, warnings, objects, repo_outcome, finished, ); } Err(err) => finished.push(FinishedPublicationPoint { node: ready.node, result: Err(err), }), } } else { inflight_publication_points.insert( ready.node.id, InflightPublicationPoint { node: ready.node, fresh_stage, objects_stage, repo_outcome, warnings, started_at: publication_point_started, objects_started_at: Instant::now(), task_count, results: Vec::with_capacity(task_count), }, ); } } } } fn enqueue_discovered_children( runner: &Rpkiv1PublicationPointRunner<'_>, next_id: &mut u64, ca_queue: &mut VecDeque, parent: &QueuedCaInstance, mut children: Vec, ) { children.sort_by(|a, b| { a.handle .manifest_rsync_uri .cmp(&b.handle.manifest_rsync_uri) .then_with(|| { a.discovered_from .child_ca_certificate_rsync_uri .cmp(&b.discovered_from.child_ca_certificate_rsync_uri) }) }); if let Some(runtime) = runner.repo_sync_runtime.as_ref() { let _ = runtime.prefetch_discovered_children(&children); } for child in children { let mut handle = child.handle.with_depth(parent.handle.depth + 1); handle.parent_manifest_rsync_uri = Some(parent.handle.manifest_rsync_uri.clone()); ca_queue.push_back(QueuedCaInstance { id: *next_id, handle, parent_id: Some(parent.id), discovered_from: Some(child.discovered_from), }); *next_id += 1; } } fn finalize_ready_objects( runner: &Rpkiv1PublicationPointRunner<'_>, node: QueuedCaInstance, fresh_stage: FreshPublicationPointStage, warnings: Vec, objects: ObjectsOutput, repo_outcome: RepoSyncRuntimeOutcome, finished: &mut Vec, ) { let result = runner .finalize_fresh_publication_point_from_reducer( &node.handle, &fresh_stage.fresh_point, warnings, objects, fresh_stage.child_audits, fresh_stage.discovered_children, repo_outcome.repo_sync_source.as_deref(), repo_outcome.repo_sync_phase.as_deref(), repo_outcome.repo_sync_duration_ms, repo_outcome.repo_sync_err.as_deref(), ) .map(|out| out.result); finished.push(FinishedPublicationPoint { node, result: compact_phase2_finished_result_result(result), }); } fn flush_pending_roa_dispatch( runner: &Rpkiv1PublicationPointRunner<'_>, pending_roa_dispatch: &mut VecDeque, ) -> Result<(), TreeRunError> { let Some(pool) = runner.parallel_roa_worker_pool.as_ref() else { return Ok(()); }; while let Some(task) = pending_roa_dispatch.pop_front() { match pool.try_submit_round_robin(task) { Ok(_) => {} Err(ObjectWorkerSubmitError::QueueFull { task, .. }) => { pending_roa_dispatch.push_front(task); break; } Err(ObjectWorkerSubmitError::Disconnected { .. }) => { return Err(TreeRunError::Runner( "parallel ROA worker queue disconnected".to_string(), )); } } } Ok(()) } fn drain_object_results( runner: &Rpkiv1PublicationPointRunner<'_>, inflight_publication_points: &mut HashMap, finished: &mut Vec, ) -> Result<(), TreeRunError> { let Some(pool) = runner.parallel_roa_worker_pool.as_ref() else { return Ok(()); }; loop { let Some(result) = pool .recv_result_timeout(Duration::from_millis(0)) .map_err(TreeRunError::Runner)? else { break; }; let pp_id = result.publication_point_id; let should_finalize = if let Some(state) = inflight_publication_points.get_mut(&pp_id) { state.results.push(result); state.results.len() == state.task_count } else { false }; if should_finalize { let state = inflight_publication_points .remove(&pp_id) .expect("inflight publication point must exist"); let objects_processing_ms = state.objects_started_at.elapsed().as_millis() as u64; match reduce_parallel_roa_stage( state.objects_stage, state.results, runner.timing.as_ref(), ) { Ok(mut objects) => { objects .router_keys .extend(state.fresh_stage.discovered_router_keys.clone()); objects.local_outputs_cache.extend( crate::validation::tree_runner::build_router_key_local_outputs( &state.node.handle, &objects.router_keys, ), ); let result = runner .finalize_fresh_publication_point_from_reducer( &state.node.handle, &state.fresh_stage.fresh_point, state.warnings, objects, state.fresh_stage.child_audits, state.fresh_stage.discovered_children, state.repo_outcome.repo_sync_source.as_deref(), state.repo_outcome.repo_sync_phase.as_deref(), state.repo_outcome.repo_sync_duration_ms, state.repo_outcome.repo_sync_err.as_deref(), ) .map(|out| out.result); crate::progress_log::emit( "phase2_publication_point_reduced", serde_json::json!({ "manifest_rsync_uri": state.node.handle.manifest_rsync_uri, "publication_point_rsync_uri": state.node.handle.publication_point_rsync_uri, "objects_processing_ms": objects_processing_ms, "total_duration_ms": state.started_at.elapsed().as_millis() as u64, }), ); finished.push(FinishedPublicationPoint { node: state.node, result: compact_phase2_finished_result_result(result), }); } Err(err) => finished.push(FinishedPublicationPoint { node: state.node, result: Err(err), }), } } } Ok(()) } fn drain_repo_events( repo_runtime: &dyn crate::parallel::repo_runtime::RepoSyncRuntime, ca_waiting_repo_by_identity: &mut HashMap>, ready_queue: &mut VecDeque, timeout: Duration, ) -> Result<(), TreeRunError> { if let Some(event) = repo_runtime .recv_repo_result_timeout(timeout) .map_err(TreeRunError::Runner)? { for completion in event.completions { let mut outcome = completion.outcome; if completion.identity != event.transport_identity { // Shared RRDP/rsync transports release many publication points, but the transport // wall time should only be counted once in per-PP stage timing aggregation. outcome.repo_sync_duration_ms = 0; } if let Some(waiters) = ca_waiting_repo_by_identity.remove(&completion.identity) { for node in waiters { ready_queue.push_back(ReadyCaInstance { node, repo_outcome: outcome.clone(), }); } } } } Ok(()) } fn event_poll_timeout( ca_queue: &VecDeque, ready_queue: &VecDeque, pending_roa_dispatch: &VecDeque, inflight_publication_points: &HashMap, instances_started: usize, config: &TreeRunConfig, ) -> Duration { if !ready_queue.is_empty() || !pending_roa_dispatch.is_empty() || !inflight_publication_points.is_empty() || (!ca_queue.is_empty() && can_start_more(instances_started, config)) { Duration::from_millis(0) } else { Duration::from_millis(50) } } fn is_complete( ca_queue: &VecDeque, ready_queue: &VecDeque, ca_waiting_repo_by_identity: &HashMap>, pending_roa_dispatch: &VecDeque, inflight_publication_points: &HashMap, instances_started: usize, config: &TreeRunConfig, ) -> bool { let ca_queue_done = ca_queue.is_empty() || !can_start_more(instances_started, config); ca_queue_done && ready_queue.is_empty() && ca_waiting_repo_by_identity.is_empty() && pending_roa_dispatch.is_empty() && inflight_publication_points.is_empty() } fn build_tree_output(mut finished: Vec) -> TreeRunAuditOutput { finished.sort_by_key(|item| item.node.id); let mut instances_processed = 0usize; let mut instances_failed = 0usize; let mut warnings = Vec::new(); let mut vrps = Vec::new(); let mut aspas = Vec::new(); let mut router_keys = Vec::new(); let mut publication_points = Vec::new(); for item in finished { match item.result { Ok(result) => { instances_processed += 1; warnings.extend(result.warnings.clone()); warnings.extend(result.objects.warnings.clone()); vrps.extend(result.objects.vrps.clone()); aspas.extend(result.objects.aspas.clone()); router_keys.extend(result.objects.router_keys.clone()); let mut audit: PublicationPointAudit = result.audit; audit.node_id = Some(item.node.id); audit.parent_node_id = item.node.parent_id; audit.discovered_from = item.node.discovered_from; publication_points.push(audit); } Err(err) => { instances_failed += 1; warnings.push( Warning::new(format!("publication point failed: {err}")) .with_context(&item.node.handle.manifest_rsync_uri), ); } } } TreeRunAuditOutput { tree: TreeRunOutput { instances_processed, instances_failed, warnings, vrps, aspas, router_keys, }, publication_points, } } pub fn run_tree_parallel_phase2_audit( root: CaInstanceHandle, runner: &Rpkiv1PublicationPointRunner<'_>, config: &TreeRunConfig, ) -> Result { run_tree_parallel_phase2_audit_multi_root(vec![root], runner, config) } #[cfg(test)] mod tests { use super::{compact_phase2_finished_result, compact_phase2_finished_result_result}; use crate::audit::PublicationPointAudit; use crate::storage::PackTime; use crate::validation::manifest::PublicationPointSource; use crate::validation::objects::{ObjectsOutput, ObjectsStats}; use crate::validation::publication_point::PublicationPointSnapshot; use crate::validation::tree::PublicationPointRunResult; fn sample_snapshot() -> PublicationPointSnapshot { PublicationPointSnapshot { format_version: PublicationPointSnapshot::FORMAT_VERSION_V1, manifest_rsync_uri: "rsync://example.test/repo/example.mft".to_string(), publication_point_rsync_uri: "rsync://example.test/repo/".to_string(), manifest_number_be: vec![1], this_update: PackTime { rfc3339_utc: "2026-04-21T00:00:00Z".to_string(), }, next_update: PackTime { rfc3339_utc: "2026-04-22T00:00:00Z".to_string(), }, verified_at: PackTime { rfc3339_utc: "2026-04-21T00:00:01Z".to_string(), }, manifest_bytes: vec![1, 2, 3], files: Vec::new(), } } fn sample_result() -> PublicationPointRunResult { PublicationPointRunResult { source: PublicationPointSource::Fresh, snapshot: Some(sample_snapshot()), warnings: Vec::new(), objects: ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), router_keys: Vec::new(), local_outputs_cache: Vec::new(), warnings: Vec::new(), stats: ObjectsStats::default(), audit: Vec::new(), }, audit: PublicationPointAudit::default(), discovered_children: Vec::new(), } } #[test] fn compact_phase2_finished_result_drops_snapshot() { let result = compact_phase2_finished_result(sample_result()); assert!(result.snapshot.is_none()); assert_eq!(result.source, PublicationPointSource::Fresh); assert!(result.discovered_children.is_empty()); } #[test] fn compact_phase2_finished_result_result_preserves_err() { let err = compact_phase2_finished_result_result(Err("boom".to_string())) .expect_err("error should be preserved"); assert_eq!(err, "boom"); } }