From f843eedda976ff31cc95af5ab8c1a6fb7034b10f Mon Sep 17 00:00:00 2001 From: yuyr Date: Wed, 6 May 2026 12:05:02 +0800 Subject: [PATCH] =?UTF-8?q?20260504=20=E4=BC=98=E5=8C=96phase2=20finalize?= =?UTF-8?q?=E8=B0=83=E5=BA=A6=E9=95=BF=E5=B0=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cli.rs | 212 ++++ src/parallel/config.rs | 15 + src/validation/tree_parallel.rs | 1019 ++++++++++++----- ...ects_process_publication_point_snapshot.rs | 3 + tests/test_objects_processing_coverage_m18.rs | 4 + 5 files changed, 946 insertions(+), 307 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index d19b6cc..e1c20a1 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -156,6 +156,16 @@ Options: Phase 2 per-worker object queue capacity (default: 256) --parallel-phase2-ready-batch-size Phase 2 ready publication points processed per scheduler turn (default: 256) + --parallel-phase2-ready-batch-wall-time-budget-ms + Phase 2 ready staging wall-time budget per scheduler turn (default: 100) + --parallel-phase2-result-drain-batch-size + Phase 2 object results drained per scheduler turn (default: 2048) + --parallel-phase2-finalize-batch-size + Legacy Phase 2 scheduler finalize budget; dedicated finalize worker ignores it (default: 256) + --parallel-phase2-finalize-batch-wall-time-budget-ms + Legacy Phase 2 scheduler finalize time budget; dedicated finalize worker ignores it (default: 100) + --parallel-phase2-finalize-queue-capacity + Phase 2 dedicated finalize worker queue capacity (default: 32768) --rsync-local-dir Use LocalDirRsyncFetcher rooted at this directory (offline tests) --disable-rrdp Disable RRDP and synchronize only via rsync @@ -293,6 +303,55 @@ pub fn parse_args(argv: &[String]) -> Result { .parse::() .map_err(|_| format!("invalid --parallel-phase2-ready-batch-size: {v}"))?; } + "--parallel-phase2-ready-batch-wall-time-budget-ms" => { + i += 1; + let v = argv + .get(i) + .ok_or("--parallel-phase2-ready-batch-wall-time-budget-ms requires a value")?; + parallel_phase2_cfg.ready_batch_wall_time_budget_ms = + v.parse::().map_err(|_| { + format!("invalid --parallel-phase2-ready-batch-wall-time-budget-ms: {v}") + })?; + } + "--parallel-phase2-result-drain-batch-size" => { + i += 1; + let v = argv + .get(i) + .ok_or("--parallel-phase2-result-drain-batch-size requires a value")?; + parallel_phase2_cfg.object_result_drain_batch_size = + v.parse::().map_err(|_| { + format!("invalid --parallel-phase2-result-drain-batch-size: {v}") + })?; + } + "--parallel-phase2-finalize-batch-size" => { + i += 1; + let v = argv + .get(i) + .ok_or("--parallel-phase2-finalize-batch-size requires a value")?; + parallel_phase2_cfg.publication_point_finalize_batch_size = v + .parse::() + .map_err(|_| format!("invalid --parallel-phase2-finalize-batch-size: {v}"))?; + } + "--parallel-phase2-finalize-batch-wall-time-budget-ms" => { + i += 1; + let v = argv.get(i).ok_or( + "--parallel-phase2-finalize-batch-wall-time-budget-ms requires a value", + )?; + parallel_phase2_cfg.publication_point_finalize_wall_time_budget_ms = + v.parse::().map_err(|_| { + format!("invalid --parallel-phase2-finalize-batch-wall-time-budget-ms: {v}") + })?; + } + "--parallel-phase2-finalize-queue-capacity" => { + i += 1; + let v = argv + .get(i) + .ok_or("--parallel-phase2-finalize-queue-capacity requires a value")?; + parallel_phase2_cfg.publication_point_finalize_queue_capacity = + v.parse::().map_err(|_| { + format!("invalid --parallel-phase2-finalize-queue-capacity: {v}") + })?; + } "--db" => { i += 1; let v = argv.get(i).ok_or("--db requires a value")?; @@ -513,6 +572,36 @@ pub fn parse_args(argv: &[String]) -> Result { usage() )); } + if parallel_phase2_cfg.ready_batch_wall_time_budget_ms == 0 { + return Err(format!( + "--parallel-phase2-ready-batch-wall-time-budget-ms must be > 0\n\n{}", + usage() + )); + } + if parallel_phase2_cfg.object_result_drain_batch_size == 0 { + return Err(format!( + "--parallel-phase2-result-drain-batch-size must be > 0\n\n{}", + usage() + )); + } + if parallel_phase2_cfg.publication_point_finalize_batch_size == 0 { + return Err(format!( + "--parallel-phase2-finalize-batch-size must be > 0\n\n{}", + usage() + )); + } + if parallel_phase2_cfg.publication_point_finalize_wall_time_budget_ms == 0 { + return Err(format!( + "--parallel-phase2-finalize-batch-wall-time-budget-ms must be > 0\n\n{}", + usage() + )); + } + if parallel_phase2_cfg.publication_point_finalize_queue_capacity == 0 { + return Err(format!( + "--parallel-phase2-finalize-queue-capacity must be > 0\n\n{}", + usage() + )); + } if !tal_urls.is_empty() && !ta_paths.is_empty() { return Err(format!( "--ta-path cannot be used with --tal-url mode\n\n{}", @@ -2191,11 +2280,44 @@ mod tests { "17".to_string(), "--parallel-phase2-ready-batch-size".to_string(), "31".to_string(), + "--parallel-phase2-ready-batch-wall-time-budget-ms".to_string(), + "43".to_string(), + "--parallel-phase2-result-drain-batch-size".to_string(), + "37".to_string(), + "--parallel-phase2-finalize-batch-size".to_string(), + "41".to_string(), + "--parallel-phase2-finalize-batch-wall-time-budget-ms".to_string(), + "47".to_string(), + "--parallel-phase2-finalize-queue-capacity".to_string(), + "8192".to_string(), ]; let args = parse_args(&argv).expect("parse args"); assert_eq!(args.parallel_phase2_config.object_workers, 3); assert_eq!(args.parallel_phase2_config.worker_queue_capacity, 17); assert_eq!(args.parallel_phase2_config.ready_batch_size, 31); + assert_eq!( + args.parallel_phase2_config.ready_batch_wall_time_budget_ms, + 43 + ); + assert_eq!( + args.parallel_phase2_config.object_result_drain_batch_size, + 37 + ); + assert_eq!( + args.parallel_phase2_config + .publication_point_finalize_batch_size, + 41 + ); + assert_eq!( + args.parallel_phase2_config + .publication_point_finalize_wall_time_budget_ms, + 47 + ); + assert_eq!( + args.parallel_phase2_config + .publication_point_finalize_queue_capacity, + 8192 + ); assert_eq!(args.parallel_phase1_config, ParallelPhase1Config::default()); } @@ -2214,6 +2336,96 @@ mod tests { assert!(err.contains("--parallel-phase2-ready-batch-size"), "{err}"); } + #[test] + fn parse_rejects_zero_phase2_result_drain_batch_size() { + let argv = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-url".to_string(), + "https://example.test/root.tal".to_string(), + "--parallel-phase2-result-drain-batch-size".to_string(), + "0".to_string(), + ]; + let err = parse_args(&argv).expect_err("zero result drain batch must fail"); + assert!( + err.contains("--parallel-phase2-result-drain-batch-size"), + "{err}" + ); + } + + #[test] + fn parse_rejects_zero_phase2_ready_batch_wall_time_budget_ms() { + let argv = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-url".to_string(), + "https://example.test/root.tal".to_string(), + "--parallel-phase2-ready-batch-wall-time-budget-ms".to_string(), + "0".to_string(), + ]; + let err = parse_args(&argv).expect_err("zero ready time budget must fail"); + assert!( + err.contains("--parallel-phase2-ready-batch-wall-time-budget-ms"), + "{err}" + ); + } + + #[test] + fn parse_rejects_zero_phase2_finalize_batch_size() { + let argv = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-url".to_string(), + "https://example.test/root.tal".to_string(), + "--parallel-phase2-finalize-batch-size".to_string(), + "0".to_string(), + ]; + let err = parse_args(&argv).expect_err("zero finalize batch must fail"); + assert!( + err.contains("--parallel-phase2-finalize-batch-size"), + "{err}" + ); + } + + #[test] + fn parse_rejects_zero_phase2_finalize_batch_wall_time_budget_ms() { + let argv = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-url".to_string(), + "https://example.test/root.tal".to_string(), + "--parallel-phase2-finalize-batch-wall-time-budget-ms".to_string(), + "0".to_string(), + ]; + let err = parse_args(&argv).expect_err("zero finalize time budget must fail"); + assert!( + err.contains("--parallel-phase2-finalize-batch-wall-time-budget-ms"), + "{err}" + ); + } + + #[test] + fn parse_rejects_zero_phase2_finalize_queue_capacity() { + let argv = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-url".to_string(), + "https://example.test/root.tal".to_string(), + "--parallel-phase2-finalize-queue-capacity".to_string(), + "0".to_string(), + ]; + let err = parse_args(&argv).expect_err("zero finalize queue capacity must fail"); + assert!( + err.contains("--parallel-phase2-finalize-queue-capacity"), + "{err}" + ); + } + #[test] fn parse_rejects_removed_parallel_enable_flags() { let argv = vec![ diff --git a/src/parallel/config.rs b/src/parallel/config.rs index cda3684..df8127c 100644 --- a/src/parallel/config.rs +++ b/src/parallel/config.rs @@ -10,6 +10,11 @@ pub struct ParallelPhase2Config { pub object_workers: usize, pub worker_queue_capacity: usize, pub ready_batch_size: usize, + pub ready_batch_wall_time_budget_ms: u64, + pub object_result_drain_batch_size: usize, + pub publication_point_finalize_batch_size: usize, + pub publication_point_finalize_wall_time_budget_ms: u64, + pub publication_point_finalize_queue_capacity: usize, } impl Default for ParallelPhase2Config { @@ -18,6 +23,11 @@ impl Default for ParallelPhase2Config { object_workers: 8, worker_queue_capacity: 256, ready_batch_size: 256, + ready_batch_wall_time_budget_ms: 100, + object_result_drain_batch_size: 2048, + publication_point_finalize_batch_size: 256, + publication_point_finalize_wall_time_budget_ms: 100, + publication_point_finalize_queue_capacity: 32768, } } } @@ -50,5 +60,10 @@ mod tests { assert!(cfg.object_workers > 0); assert!(cfg.worker_queue_capacity > 0); assert!(cfg.ready_batch_size > 0); + assert!(cfg.ready_batch_wall_time_budget_ms > 0); + assert!(cfg.object_result_drain_batch_size > 0); + assert!(cfg.publication_point_finalize_batch_size > 0); + assert!(cfg.publication_point_finalize_wall_time_budget_ms > 0); + assert!(cfg.publication_point_finalize_queue_capacity > 0); } } diff --git a/src/validation/tree_parallel.rs b/src/validation/tree_parallel.rs index 58089ea..0079f79 100644 --- a/src/validation/tree_parallel.rs +++ b/src/validation/tree_parallel.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, HashSet, VecDeque}; +use std::sync::mpsc::{self, Receiver, SyncSender, TryRecvError, TrySendError}; use std::time::{Duration, Instant}; use crate::audit::{DiscoveredFrom, PublicationPointAudit}; @@ -58,6 +59,15 @@ struct FinishedPublicationPoint { result: Result, } +struct FinalizeTask { + state: InflightPublicationPoint, +} + +struct FinalizeWorkerResult { + finished: FinishedPublicationPoint, + metrics: FinalizePublicationPointMetrics, +} + #[derive(Default)] struct ReadyStageMetrics { manifest_rsync_uri: Option, @@ -248,15 +258,40 @@ struct RoaDispatchMetrics { #[derive(Default)] struct ObjectDrainMetrics { results_drained: usize, - publication_points_finalized: usize, - reduce_ms_total: u64, - reduce_ms_max: u64, - finalize_ms_total: u64, - finalize_ms_max: u64, + publication_points_completed: usize, worker_ms_total: u64, worker_ms_max: u64, queue_wait_ms_total: u64, queue_wait_ms_max: u64, + result_budget_exhausted: bool, + duration_ms: u64, +} + +#[derive(Default)] +struct FinalizeSubmitMetrics { + submitted: usize, + queue_full: bool, + duration_ms: u64, +} + +#[derive(Default)] +struct FinalizePublicationPointMetrics { + reduce_ms: u64, + finalize_ms: u64, + finalize_queue_wait_ms: Option, + finalize_worker_ms: u64, +} + +#[derive(Default)] +struct FinalizeResultsDrainMetrics { + results_drained: usize, + reduce_ms_total: u64, + reduce_ms_max: u64, + finalize_ms_total: u64, + finalize_ms_max: u64, + finalize_queue_wait_ms_max: u64, + finalize_worker_ms_total: u64, + finalize_worker_ms_max: u64, duration_ms: u64, } @@ -329,239 +364,304 @@ pub fn run_tree_parallel_phase2_audit_multi_root( HashMap::new(); let mut ready_queue: VecDeque = VecDeque::new(); let mut inflight_publication_points: HashMap = HashMap::new(); + let mut pending_finalization: VecDeque = VecDeque::new(); let mut pending_roa_dispatch: VecDeque = VecDeque::new(); let mut finished: Vec = Vec::new(); let mut instances_started = 0usize; - let ready_batch_size = runner - .parallel_phase2_config - .as_ref() + let phase2_config = runner.parallel_phase2_config.as_ref(); + let ready_batch_size = phase2_config .map(|cfg| cfg.ready_batch_size) .unwrap_or(256) .max(1); + let ready_batch_wall_time_budget_ms = phase2_config + .map(|cfg| cfg.ready_batch_wall_time_budget_ms) + .unwrap_or(100) + .max(1); + let ready_batch_wall_time_budget = Duration::from_millis(ready_batch_wall_time_budget_ms); + let object_result_drain_batch_size = phase2_config + .map(|cfg| cfg.object_result_drain_batch_size) + .unwrap_or(2048) + .max(1); + let publication_point_finalize_queue_capacity = phase2_config + .map(|cfg| cfg.publication_point_finalize_queue_capacity) + .unwrap_or(32768) + .max(1); - loop { - while can_start_more(instances_started, config) { - let Some(node) = ca_queue.pop_front() else { - break; - }; - if !visited_manifest_uris.insert(node.handle.manifest_rsync_uri.clone()) { - continue; - } - if let Some(max_depth) = config.max_depth { - if node.handle.depth > max_depth { - continue; - } - } - instances_started += 1; - match repo_runtime.request_publication_point_repo(&node.handle, 0) { - Ok(RepoSyncRequestStatus::Ready { mut outcome, .. }) => { - // Ready here means this CA is reusing repo work that has already completed - // (often due to child prefetch). Do not add the transport duration again. - outcome.repo_sync_duration_ms = 0; - ready_queue.push_back(ReadyCaInstance { - node, - repo_outcome: outcome, - }); - } - Ok(RepoSyncRequestStatus::Pending { identity, .. }) => { - ca_waiting_repo_by_identity - .entry(identity) - .or_default() - .push(node); - } - Err(err) => { - finished.push(FinishedPublicationPoint { - node, - result: Err(err), - }); - } - } - } + let (finalize_task_tx, finalize_task_rx) = + mpsc::sync_channel::(publication_point_finalize_queue_capacity); + let (finalize_result_tx, finalize_result_rx) = mpsc::channel::(); + let mut finalize_inflight = 0usize; - let ready_batch_started = Instant::now(); - let mut ready_batch_metrics = ReadyStageBatchMetrics::default(); - while ready_batch_metrics.ready_count < ready_batch_size { - let Some(ready) = ready_queue.pop_front() else { - break; - }; - let metrics = stage_ready_publication_point( + return std::thread::scope(|scope| { + let finalize_worker = scope.spawn(move || { + run_finalize_worker( runner, - &mut next_id, - &mut ca_queue, - &mut pending_roa_dispatch, - &mut inflight_publication_points, - &mut finished, - ready, + finalize_task_rx, + finalize_result_tx, config.compact_audit, - ); - ready_batch_metrics.record(metrics); - } - if ready_batch_metrics.ready_count > 0 { - ready_batch_metrics.total_ms = elapsed_ms(ready_batch_started); - crate::progress_log::emit( - "phase2_ready_queue_batch", - serde_json::json!({ - "ready_count": ready_batch_metrics.ready_count, - "fallback_count": ready_batch_metrics.fallback_count, - "complete_count": ready_batch_metrics.complete_count, - "staged_count": ready_batch_metrics.staged_count, - "zero_task_count": ready_batch_metrics.zero_task_count, - "error_count": ready_batch_metrics.error_count, - "discovered_children": ready_batch_metrics.discovered_children, - "locked_files": ready_batch_metrics.locked_files, - "roa_tasks": ready_batch_metrics.roa_tasks, - "aspa_objects": ready_batch_metrics.aspa_objects, - "stage_fresh_ms_total": ready_batch_metrics.stage_fresh_ms_total, - "stage_fresh_ms_max": ready_batch_metrics.stage_fresh_ms_max, - "stage_fresh_ms_max_manifest_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_manifest_rsync_uri, - "stage_fresh_ms_max_publication_point_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_publication_point_rsync_uri, - "child_enqueue_ms_total": ready_batch_metrics.child_enqueue_ms_total, - "child_enqueue_ms_max": ready_batch_metrics.child_enqueue_ms_max, - "prepare_ms_total": ready_batch_metrics.prepare_ms_total, - "prepare_ms_max": ready_batch_metrics.prepare_ms_max, - "build_roa_tasks_ms_total": ready_batch_metrics.build_roa_tasks_ms_total, - "build_roa_tasks_ms_max": ready_batch_metrics.build_roa_tasks_ms_max, - "batch_duration_ms": ready_batch_metrics.total_ms, - "ready_batch_size": ready_batch_size, - "ready_queue_len_after_batch": ready_queue.len(), - "ready_queue_budget_exhausted": !ready_queue.is_empty(), - "ca_queue_len_after_batch": ca_queue.len(), - "pending_roa_dispatch_len_after_batch": pending_roa_dispatch.len(), - "inflight_publication_points_after_batch": inflight_publication_points.len(), - }), - ); - crate::progress_log::emit( - "phase2_ready_queue_stage_fresh_breakdown", - serde_json::json!({ - "ready_count": ready_batch_metrics.ready_count, - "stage_fresh_ms_total": ready_batch_metrics.stage_fresh_ms_total, - "stage_fresh_ms_max": ready_batch_metrics.stage_fresh_ms_max, - "stage_fresh_ms_max_manifest_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_manifest_rsync_uri, - "stage_fresh_ms_max_publication_point_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_publication_point_rsync_uri, - "snapshot_prepare_ms_total": ready_batch_metrics.snapshot_prepare_ms_total, - "snapshot_prepare_ms_max": ready_batch_metrics.snapshot_prepare_ms_max, - "snapshot_current_index_lock_ms_total": ready_batch_metrics.snapshot_current_index_lock_ms_total, - "snapshot_current_index_lock_ms_max": ready_batch_metrics.snapshot_current_index_lock_ms_max, - "snapshot_manifest_load_ms_total": ready_batch_metrics.snapshot_manifest_load_ms_total, - "snapshot_manifest_load_ms_max": ready_batch_metrics.snapshot_manifest_load_ms_max, - "snapshot_manifest_index_lookup_ms_total": ready_batch_metrics.snapshot_manifest_index_lookup_ms_total, - "snapshot_manifest_index_lookup_ms_max": ready_batch_metrics.snapshot_manifest_index_lookup_ms_max, - "snapshot_manifest_blob_load_ms_total": ready_batch_metrics.snapshot_manifest_blob_load_ms_total, - "snapshot_manifest_blob_load_ms_max": ready_batch_metrics.snapshot_manifest_blob_load_ms_max, - "snapshot_manifest_decode_ms_total": ready_batch_metrics.snapshot_manifest_decode_ms_total, - "snapshot_manifest_decode_ms_max": ready_batch_metrics.snapshot_manifest_decode_ms_max, - "snapshot_replay_guard_ms_total": ready_batch_metrics.snapshot_replay_guard_ms_total, - "snapshot_replay_guard_ms_max": ready_batch_metrics.snapshot_replay_guard_ms_max, - "replay_meta_hit_count": ready_batch_metrics.replay_meta_hit_count, - "replay_meta_miss_count": ready_batch_metrics.replay_meta_miss_count, - "snapshot_manifest_entries_ms_total": ready_batch_metrics.snapshot_manifest_entries_ms_total, - "snapshot_manifest_entries_ms_max": ready_batch_metrics.snapshot_manifest_entries_ms_max, - "snapshot_pack_files_ms_total": ready_batch_metrics.snapshot_pack_files_ms_total, - "snapshot_pack_files_ms_max": ready_batch_metrics.snapshot_pack_files_ms_max, - "snapshot_pack_files_index_lookup_ms_total": ready_batch_metrics.snapshot_pack_files_index_lookup_ms_total, - "snapshot_pack_files_index_lookup_ms_max": ready_batch_metrics.snapshot_pack_files_index_lookup_ms_max, - "snapshot_pack_files_blob_load_ms_total": ready_batch_metrics.snapshot_pack_files_blob_load_ms_total, - "snapshot_pack_files_blob_load_ms_max": ready_batch_metrics.snapshot_pack_files_blob_load_ms_max, - "snapshot_ee_path_validate_ms_total": ready_batch_metrics.snapshot_ee_path_validate_ms_total, - "snapshot_ee_path_validate_ms_max": ready_batch_metrics.snapshot_ee_path_validate_ms_max, - "snapshot_manifest_file_count_total": ready_batch_metrics.snapshot_manifest_file_count_total, - "snapshot_manifest_file_count_max": ready_batch_metrics.snapshot_manifest_file_count_max, - "child_discovery_ms_total": ready_batch_metrics.child_discovery_ms_total, - "child_discovery_ms_max": ready_batch_metrics.child_discovery_ms_max, - "batch_duration_ms": ready_batch_metrics.total_ms, - }), - ); - } + ) + }); - let dispatch_metrics = flush_pending_roa_dispatch( - runner, - &mut pending_roa_dispatch, - &mut inflight_publication_points, - )?; - if dispatch_metrics.attempted > 0 || dispatch_metrics.queue_full { - crate::progress_log::emit( - "phase2_roa_dispatch_batch", - serde_json::json!({ - "attempted": dispatch_metrics.attempted, - "submitted": dispatch_metrics.submitted, - "queue_full": dispatch_metrics.queue_full, - "pending_remaining": dispatch_metrics.pending_remaining, - "duration_ms": dispatch_metrics.duration_ms, - "inflight_publication_points": inflight_publication_points.len(), - }), - ); - } - let drain_metrics = drain_object_results( - runner, - &mut inflight_publication_points, + let run_result: Result<(), TreeRunError> = (|| { + loop { + drain_finalize_results_with_progress( + &finalize_result_rx, + &mut finished, + &mut finalize_inflight, + pending_finalization.len(), + pending_roa_dispatch.len(), + inflight_publication_points.len(), + )?; + flush_pending_roa_dispatch_with_progress( + runner, + &mut pending_roa_dispatch, + &mut inflight_publication_points, + &pending_finalization, + )?; + drain_object_results_with_progress( + runner, + &mut inflight_publication_points, + &mut pending_finalization, + pending_roa_dispatch.len(), + object_result_drain_batch_size, + )?; + submit_pending_finalization_with_progress( + &finalize_task_tx, + &mut pending_finalization, + &mut finalize_inflight, + publication_point_finalize_queue_capacity, + pending_roa_dispatch.len(), + inflight_publication_points.len(), + )?; + + start_queued_ca_instances( + repo_runtime.as_ref(), + &mut ca_queue, + &mut ready_queue, + &mut ca_waiting_repo_by_identity, + &mut finished, + &mut visited_manifest_uris, + &mut instances_started, + config, + ); + + let repo_poll_timeout = event_poll_timeout( + &ca_queue, + &ready_queue, + &pending_roa_dispatch, + &inflight_publication_points, + &pending_finalization, + finalize_inflight, + instances_started, + config, + ); + let repo_metrics = drain_repo_events( + repo_runtime.as_ref(), + &mut ca_waiting_repo_by_identity, + &mut ready_queue, + repo_poll_timeout, + )?; + if repo_metrics.event_count > 0 { + crate::progress_log::emit( + "phase2_repo_events_drain", + serde_json::json!({ + "event_count": repo_metrics.event_count, + "completions": repo_metrics.completions, + "ready_enqueued": repo_metrics.ready_enqueued, + "duration_ms": repo_metrics.duration_ms, + "ready_queue_len": ready_queue.len(), + "ca_waiting_repo_identities": ca_waiting_repo_by_identity.len(), + }), + ); + } + + let ready_batch_started = Instant::now(); + let mut ready_batch_metrics = ReadyStageBatchMetrics::default(); + let mut ready_time_budget_exhausted = false; + while ready_batch_metrics.ready_count < ready_batch_size { + let Some(ready) = ready_queue.pop_front() else { + break; + }; + let metrics = stage_ready_publication_point( + runner, + &mut next_id, + &mut ca_queue, + &mut pending_roa_dispatch, + &mut inflight_publication_points, + &mut finished, + ready, + config.compact_audit, + ); + ready_batch_metrics.record(metrics); + if ready_batch_metrics.ready_count > 0 + && ready_batch_started.elapsed() >= ready_batch_wall_time_budget + { + ready_time_budget_exhausted = !ready_queue.is_empty(); + break; + } + } + if ready_batch_metrics.ready_count > 0 { + ready_batch_metrics.total_ms = elapsed_ms(ready_batch_started); + let ready_count_budget_exhausted = ready_batch_metrics.ready_count + >= ready_batch_size + && !ready_queue.is_empty(); + ready_time_budget_exhausted = ready_time_budget_exhausted + || (!ready_queue.is_empty() + && ready_batch_metrics.total_ms >= ready_batch_wall_time_budget_ms); + crate::progress_log::emit( + "phase2_ready_queue_batch", + serde_json::json!({ + "ready_count": ready_batch_metrics.ready_count, + "fallback_count": ready_batch_metrics.fallback_count, + "complete_count": ready_batch_metrics.complete_count, + "staged_count": ready_batch_metrics.staged_count, + "zero_task_count": ready_batch_metrics.zero_task_count, + "error_count": ready_batch_metrics.error_count, + "discovered_children": ready_batch_metrics.discovered_children, + "locked_files": ready_batch_metrics.locked_files, + "roa_tasks": ready_batch_metrics.roa_tasks, + "aspa_objects": ready_batch_metrics.aspa_objects, + "stage_fresh_ms_total": ready_batch_metrics.stage_fresh_ms_total, + "stage_fresh_ms_max": ready_batch_metrics.stage_fresh_ms_max, + "stage_fresh_ms_max_manifest_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_manifest_rsync_uri, + "stage_fresh_ms_max_publication_point_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_publication_point_rsync_uri, + "child_enqueue_ms_total": ready_batch_metrics.child_enqueue_ms_total, + "child_enqueue_ms_max": ready_batch_metrics.child_enqueue_ms_max, + "prepare_ms_total": ready_batch_metrics.prepare_ms_total, + "prepare_ms_max": ready_batch_metrics.prepare_ms_max, + "build_roa_tasks_ms_total": ready_batch_metrics.build_roa_tasks_ms_total, + "build_roa_tasks_ms_max": ready_batch_metrics.build_roa_tasks_ms_max, + "batch_duration_ms": ready_batch_metrics.total_ms, + "ready_batch_size": ready_batch_size, + "ready_batch_wall_time_budget_ms": ready_batch_wall_time_budget_ms, + "ready_queue_len_after_batch": ready_queue.len(), + "ready_queue_budget_exhausted": !ready_queue.is_empty(), + "ready_count_budget_exhausted": ready_count_budget_exhausted, + "ready_time_budget_exhausted": ready_time_budget_exhausted, + "ca_queue_len_after_batch": ca_queue.len(), + "pending_roa_dispatch_len_after_batch": pending_roa_dispatch.len(), + "inflight_publication_points_after_batch": inflight_publication_points.len(), + "pending_finalization_len_after_batch": pending_finalization.len(), + "finalize_inflight_after_batch": finalize_inflight, + }), + ); + crate::progress_log::emit( + "phase2_ready_queue_stage_fresh_breakdown", + serde_json::json!({ + "ready_count": ready_batch_metrics.ready_count, + "stage_fresh_ms_total": ready_batch_metrics.stage_fresh_ms_total, + "stage_fresh_ms_max": ready_batch_metrics.stage_fresh_ms_max, + "stage_fresh_ms_max_manifest_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_manifest_rsync_uri, + "stage_fresh_ms_max_publication_point_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_publication_point_rsync_uri, + "snapshot_prepare_ms_total": ready_batch_metrics.snapshot_prepare_ms_total, + "snapshot_prepare_ms_max": ready_batch_metrics.snapshot_prepare_ms_max, + "snapshot_current_index_lock_ms_total": ready_batch_metrics.snapshot_current_index_lock_ms_total, + "snapshot_current_index_lock_ms_max": ready_batch_metrics.snapshot_current_index_lock_ms_max, + "snapshot_manifest_load_ms_total": ready_batch_metrics.snapshot_manifest_load_ms_total, + "snapshot_manifest_load_ms_max": ready_batch_metrics.snapshot_manifest_load_ms_max, + "snapshot_manifest_index_lookup_ms_total": ready_batch_metrics.snapshot_manifest_index_lookup_ms_total, + "snapshot_manifest_index_lookup_ms_max": ready_batch_metrics.snapshot_manifest_index_lookup_ms_max, + "snapshot_manifest_blob_load_ms_total": ready_batch_metrics.snapshot_manifest_blob_load_ms_total, + "snapshot_manifest_blob_load_ms_max": ready_batch_metrics.snapshot_manifest_blob_load_ms_max, + "snapshot_manifest_decode_ms_total": ready_batch_metrics.snapshot_manifest_decode_ms_total, + "snapshot_manifest_decode_ms_max": ready_batch_metrics.snapshot_manifest_decode_ms_max, + "snapshot_replay_guard_ms_total": ready_batch_metrics.snapshot_replay_guard_ms_total, + "snapshot_replay_guard_ms_max": ready_batch_metrics.snapshot_replay_guard_ms_max, + "replay_meta_hit_count": ready_batch_metrics.replay_meta_hit_count, + "replay_meta_miss_count": ready_batch_metrics.replay_meta_miss_count, + "snapshot_manifest_entries_ms_total": ready_batch_metrics.snapshot_manifest_entries_ms_total, + "snapshot_manifest_entries_ms_max": ready_batch_metrics.snapshot_manifest_entries_ms_max, + "snapshot_pack_files_ms_total": ready_batch_metrics.snapshot_pack_files_ms_total, + "snapshot_pack_files_ms_max": ready_batch_metrics.snapshot_pack_files_ms_max, + "snapshot_pack_files_index_lookup_ms_total": ready_batch_metrics.snapshot_pack_files_index_lookup_ms_total, + "snapshot_pack_files_index_lookup_ms_max": ready_batch_metrics.snapshot_pack_files_index_lookup_ms_max, + "snapshot_pack_files_blob_load_ms_total": ready_batch_metrics.snapshot_pack_files_blob_load_ms_total, + "snapshot_pack_files_blob_load_ms_max": ready_batch_metrics.snapshot_pack_files_blob_load_ms_max, + "snapshot_ee_path_validate_ms_total": ready_batch_metrics.snapshot_ee_path_validate_ms_total, + "snapshot_ee_path_validate_ms_max": ready_batch_metrics.snapshot_ee_path_validate_ms_max, + "snapshot_manifest_file_count_total": ready_batch_metrics.snapshot_manifest_file_count_total, + "snapshot_manifest_file_count_max": ready_batch_metrics.snapshot_manifest_file_count_max, + "child_discovery_ms_total": ready_batch_metrics.child_discovery_ms_total, + "child_discovery_ms_max": ready_batch_metrics.child_discovery_ms_max, + "batch_duration_ms": ready_batch_metrics.total_ms, + }), + ); + } + + flush_pending_roa_dispatch_with_progress( + runner, + &mut pending_roa_dispatch, + &mut inflight_publication_points, + &pending_finalization, + )?; + drain_object_results_with_progress( + runner, + &mut inflight_publication_points, + &mut pending_finalization, + pending_roa_dispatch.len(), + object_result_drain_batch_size, + )?; + submit_pending_finalization_with_progress( + &finalize_task_tx, + &mut pending_finalization, + &mut finalize_inflight, + publication_point_finalize_queue_capacity, + pending_roa_dispatch.len(), + inflight_publication_points.len(), + )?; + drain_finalize_results_with_progress( + &finalize_result_rx, + &mut finished, + &mut finalize_inflight, + pending_finalization.len(), + pending_roa_dispatch.len(), + inflight_publication_points.len(), + )?; + + if is_complete( + &ca_queue, + &ready_queue, + &ca_waiting_repo_by_identity, + &pending_roa_dispatch, + &inflight_publication_points, + &pending_finalization, + finalize_inflight, + instances_started, + config, + ) { + break; + } + } + + repo_runtime + .reset_run_state() + .map_err(TreeRunError::Runner)?; + Ok(()) + })(); + + drop(finalize_task_tx); + let worker_result = finalize_worker + .join() + .map_err(|_| TreeRunError::Runner("phase2 finalize worker panicked".to_string()))?; + run_result?; + worker_result?; + drain_finalize_results_with_progress( + &finalize_result_rx, &mut finished, - config.compact_audit, + &mut finalize_inflight, + pending_finalization.len(), + pending_roa_dispatch.len(), + inflight_publication_points.len(), )?; - if drain_metrics.results_drained > 0 { - crate::progress_log::emit( - "phase2_object_results_drain", - serde_json::json!({ - "results_drained": drain_metrics.results_drained, - "publication_points_finalized": drain_metrics.publication_points_finalized, - "reduce_ms_total": drain_metrics.reduce_ms_total, - "reduce_ms_max": drain_metrics.reduce_ms_max, - "finalize_ms_total": drain_metrics.finalize_ms_total, - "finalize_ms_max": drain_metrics.finalize_ms_max, - "worker_ms_total": drain_metrics.worker_ms_total, - "worker_ms_max": drain_metrics.worker_ms_max, - "queue_wait_ms_total": drain_metrics.queue_wait_ms_total, - "queue_wait_ms_max": drain_metrics.queue_wait_ms_max, - "duration_ms": drain_metrics.duration_ms, - "pending_roa_dispatch_len": pending_roa_dispatch.len(), - "inflight_publication_points": inflight_publication_points.len(), - }), - ); + if finalize_inflight != 0 || !pending_finalization.is_empty() { + return Err(TreeRunError::Runner(format!( + "phase2 finalize worker stopped with pending work: queued={} inflight={}", + pending_finalization.len(), + finalize_inflight + ))); } - let repo_poll_timeout = event_poll_timeout( - &ca_queue, - &ready_queue, - &pending_roa_dispatch, - &inflight_publication_points, - instances_started, - config, - ); - let repo_metrics = drain_repo_events( - repo_runtime.as_ref(), - &mut ca_waiting_repo_by_identity, - &mut ready_queue, - repo_poll_timeout, - )?; - if repo_metrics.event_count > 0 { - crate::progress_log::emit( - "phase2_repo_events_drain", - serde_json::json!({ - "event_count": repo_metrics.event_count, - "completions": repo_metrics.completions, - "ready_enqueued": repo_metrics.ready_enqueued, - "duration_ms": repo_metrics.duration_ms, - "ready_queue_len": ready_queue.len(), - "ca_waiting_repo_identities": ca_waiting_repo_by_identity.len(), - }), - ); - } - - if is_complete( - &ca_queue, - &ready_queue, - &ca_waiting_repo_by_identity, - &pending_roa_dispatch, - &inflight_publication_points, - instances_started, - config, - ) { - break; - } - } - - repo_runtime - .reset_run_state() - .map_err(TreeRunError::Runner)?; - Ok(build_tree_output(finished)) + Ok(build_tree_output(finished)) + }); } fn can_start_more(instances_started: usize, config: &TreeRunConfig) -> bool { @@ -571,6 +671,55 @@ fn can_start_more(instances_started: usize, config: &TreeRunConfig) -> bool { .unwrap_or(true) } +fn start_queued_ca_instances( + repo_runtime: &dyn crate::parallel::repo_runtime::RepoSyncRuntime, + ca_queue: &mut VecDeque, + ready_queue: &mut VecDeque, + ca_waiting_repo_by_identity: &mut HashMap>, + finished: &mut Vec, + visited_manifest_uris: &mut HashSet, + instances_started: &mut usize, + config: &TreeRunConfig, +) { + while can_start_more(*instances_started, config) { + let Some(node) = ca_queue.pop_front() else { + break; + }; + if !visited_manifest_uris.insert(node.handle.manifest_rsync_uri.clone()) { + continue; + } + if let Some(max_depth) = config.max_depth { + if node.handle.depth > max_depth { + continue; + } + } + *instances_started += 1; + match repo_runtime.request_publication_point_repo(&node.handle, 0) { + Ok(RepoSyncRequestStatus::Ready { mut outcome, .. }) => { + // Ready here means this CA is reusing repo work that has already completed + // (often due to child prefetch). Do not add the transport duration again. + outcome.repo_sync_duration_ms = 0; + ready_queue.push_back(ReadyCaInstance { + node, + repo_outcome: outcome, + }); + } + Ok(RepoSyncRequestStatus::Pending { identity, .. }) => { + ca_waiting_repo_by_identity + .entry(identity) + .or_default() + .push(node); + } + Err(err) => { + finished.push(FinishedPublicationPoint { + node, + result: Err(err), + }); + } + } + } +} + fn stage_ready_publication_point( runner: &Rpkiv1PublicationPointRunner<'_>, next_id: &mut u64, @@ -920,18 +1069,44 @@ fn flush_pending_roa_dispatch( Ok(metrics) } +fn flush_pending_roa_dispatch_with_progress( + runner: &Rpkiv1PublicationPointRunner<'_>, + pending_roa_dispatch: &mut VecDeque, + inflight_publication_points: &mut HashMap, + pending_finalization: &VecDeque, +) -> Result<(), TreeRunError> { + let dispatch_metrics = + flush_pending_roa_dispatch(runner, pending_roa_dispatch, inflight_publication_points)?; + if dispatch_metrics.attempted > 0 || dispatch_metrics.queue_full { + crate::progress_log::emit( + "phase2_roa_dispatch_batch", + serde_json::json!({ + "attempted": dispatch_metrics.attempted, + "submitted": dispatch_metrics.submitted, + "queue_full": dispatch_metrics.queue_full, + "pending_remaining": dispatch_metrics.pending_remaining, + "duration_ms": dispatch_metrics.duration_ms, + "inflight_publication_points": inflight_publication_points.len(), + "pending_finalization_len": pending_finalization.len(), + }), + ); + } + Ok(()) +} + fn drain_object_results( runner: &Rpkiv1PublicationPointRunner<'_>, inflight_publication_points: &mut HashMap, - finished: &mut Vec, - compact_audit: bool, + pending_finalization: &mut VecDeque, + result_budget: usize, ) -> Result { let started = Instant::now(); let mut metrics = ObjectDrainMetrics::default(); let Some(pool) = runner.parallel_roa_worker_pool.as_ref() else { return Ok(metrics); }; - loop { + let result_budget = result_budget.max(1); + while metrics.results_drained < result_budget { let Some(result) = pool .recv_result_timeout(Duration::from_millis(0)) .map_err(TreeRunError::Runner)? @@ -964,86 +1139,72 @@ fn drain_object_results( let state = inflight_publication_points .remove(&pp_id) .expect("inflight publication point must exist"); - let objects_processing_ms = state.objects_started_at.elapsed().as_millis() as u64; - let reduce_started = Instant::now(); - let reduce_result = reduce_parallel_roa_stage( - state.objects_stage, - state.results, - runner.timing.as_ref(), - ); - let reduce_ms = elapsed_ms(reduce_started); - metrics.reduce_ms_total += reduce_ms; - metrics.reduce_ms_max = metrics.reduce_ms_max.max(reduce_ms); - metrics.publication_points_finalized += 1; - match reduce_result { - Ok(mut objects) => { - let finalize_started = Instant::now(); - objects - .router_keys - .extend(state.fresh_stage.discovered_router_keys.clone()); - objects.local_outputs_cache.extend( - crate::validation::tree_runner::build_router_key_local_outputs( - &state.node.handle, - &objects.router_keys, - ), - ); - let result = runner - .finalize_fresh_publication_point_from_reducer( - &state.node.handle, - &state.fresh_stage.fresh_point, - state.warnings, - objects, - state.fresh_stage.child_audits, - state.fresh_stage.discovered_children, - state.repo_outcome.repo_sync_source.as_deref(), - state.repo_outcome.repo_sync_phase.as_deref(), - state.repo_outcome.repo_sync_duration_ms, - state.repo_outcome.repo_sync_err.as_deref(), - ) - .map(|out| out.result); - let finalize_ms = elapsed_ms(finalize_started); - metrics.finalize_ms_total += finalize_ms; - metrics.finalize_ms_max = metrics.finalize_ms_max.max(finalize_ms); - crate::progress_log::emit( - "phase2_publication_point_reduced", - serde_json::json!({ - "manifest_rsync_uri": state.node.handle.manifest_rsync_uri.as_str(), - "publication_point_rsync_uri": state.node.handle.publication_point_rsync_uri.as_str(), - "objects_processing_ms": objects_processing_ms, - "task_count": state.task_count, - "tasks_submitted": state.tasks_submitted, - "first_task_submitted_ms": state.first_task_submitted_at.map(|t| t.saturating_duration_since(state.objects_started_at).as_millis() as u64), - "last_task_submitted_ms": state.last_task_submitted_at.map(|t| t.saturating_duration_since(state.objects_started_at).as_millis() as u64), - "task_submit_span_ms": match (state.first_task_submitted_at, state.last_task_submitted_at) { - (Some(first), Some(last)) => Some(last.saturating_duration_since(first).as_millis() as u64), - _ => None, - }, - "first_result_ms": state.first_result_at.map(|t| t.saturating_duration_since(state.objects_started_at).as_millis() as u64), - "last_result_ms": state.last_result_at.map(|t| t.saturating_duration_since(state.objects_started_at).as_millis() as u64), - "result_span_ms": match (state.first_result_at, state.last_result_at) { - (Some(first), Some(last)) => Some(last.saturating_duration_since(first).as_millis() as u64), - _ => None, - }, - "worker_ms_total": state.worker_ms_total, - "worker_ms_max": state.worker_ms_max, - "worker_ms_avg": if state.task_count > 0 { state.worker_ms_total / state.task_count as u64 } else { 0 }, - "queue_wait_ms_total": state.queue_wait_ms_total, - "queue_wait_ms_max": state.queue_wait_ms_max, - "queue_wait_ms_avg": if state.task_count > 0 { state.queue_wait_ms_total / state.task_count as u64 } else { 0 }, - "reduce_ms": reduce_ms, - "finalize_ms": finalize_ms, - "total_duration_ms": state.started_at.elapsed().as_millis() as u64, - }), - ); - finished.push(FinishedPublicationPoint { - node: state.node, - result: compact_phase2_finished_result_result(result, compact_audit), - }); - } - Err(err) => finished.push(FinishedPublicationPoint { - node: state.node, - result: Err(err), - }), + metrics.publication_points_completed += 1; + pending_finalization.push_back(FinalizeTask { state }); + } + } + metrics.result_budget_exhausted = metrics.results_drained == result_budget; + metrics.duration_ms = elapsed_ms(started); + Ok(metrics) +} + +fn drain_object_results_with_progress( + runner: &Rpkiv1PublicationPointRunner<'_>, + inflight_publication_points: &mut HashMap, + pending_finalization: &mut VecDeque, + pending_roa_dispatch_len: usize, + result_budget: usize, +) -> Result<(), TreeRunError> { + let drain_metrics = drain_object_results( + runner, + inflight_publication_points, + pending_finalization, + result_budget, + )?; + if drain_metrics.results_drained > 0 || drain_metrics.result_budget_exhausted { + crate::progress_log::emit( + "phase2_object_results_drain", + serde_json::json!({ + "results_drained": drain_metrics.results_drained, + "publication_points_completed": drain_metrics.publication_points_completed, + "result_budget_exhausted": drain_metrics.result_budget_exhausted, + "result_drain_batch_size": result_budget, + "worker_ms_total": drain_metrics.worker_ms_total, + "worker_ms_max": drain_metrics.worker_ms_max, + "queue_wait_ms_total": drain_metrics.queue_wait_ms_total, + "queue_wait_ms_max": drain_metrics.queue_wait_ms_max, + "duration_ms": drain_metrics.duration_ms, + "pending_roa_dispatch_len": pending_roa_dispatch_len, + "inflight_publication_points": inflight_publication_points.len(), + "pending_finalization_len": pending_finalization.len(), + }), + ); + } + Ok(()) +} + +fn submit_pending_finalization( + finalize_task_tx: &SyncSender, + pending_finalization: &mut VecDeque, + finalize_inflight: &mut usize, +) -> Result { + let started = Instant::now(); + let mut metrics = FinalizeSubmitMetrics::default(); + while let Some(task) = pending_finalization.pop_front() { + match finalize_task_tx.try_send(task) { + Ok(()) => { + metrics.submitted += 1; + *finalize_inflight += 1; + } + Err(TrySendError::Full(task)) => { + pending_finalization.push_front(task); + metrics.queue_full = true; + break; + } + Err(TrySendError::Disconnected(_task)) => { + return Err(TreeRunError::Runner( + "phase2 finalize worker queue disconnected".to_string(), + )); } } } @@ -1051,6 +1212,241 @@ fn drain_object_results( Ok(metrics) } +fn submit_pending_finalization_with_progress( + finalize_task_tx: &SyncSender, + pending_finalization: &mut VecDeque, + finalize_inflight: &mut usize, + finalize_queue_capacity: usize, + pending_roa_dispatch_len: usize, + inflight_publication_points_len: usize, +) -> Result<(), TreeRunError> { + let submit_metrics = + submit_pending_finalization(finalize_task_tx, pending_finalization, finalize_inflight)?; + if submit_metrics.submitted > 0 || submit_metrics.queue_full { + crate::progress_log::emit( + "phase2_finalize_task_submit", + serde_json::json!({ + "submitted": submit_metrics.submitted, + "queue_full": submit_metrics.queue_full, + "duration_ms": submit_metrics.duration_ms, + "finalize_queue_capacity": finalize_queue_capacity, + "pending_finalization_len": pending_finalization.len(), + "finalize_inflight": *finalize_inflight, + "pending_roa_dispatch_len": pending_roa_dispatch_len, + "inflight_publication_points": inflight_publication_points_len, + }), + ); + } + Ok(()) +} + +fn drain_finalize_results( + finalize_result_rx: &Receiver, + finished: &mut Vec, + finalize_inflight: &mut usize, +) -> Result { + let started = Instant::now(); + let mut metrics = FinalizeResultsDrainMetrics::default(); + loop { + match finalize_result_rx.try_recv() { + Ok(result) => { + metrics.results_drained += 1; + metrics.reduce_ms_total += result.metrics.reduce_ms; + metrics.reduce_ms_max = metrics.reduce_ms_max.max(result.metrics.reduce_ms); + metrics.finalize_ms_total += result.metrics.finalize_ms; + metrics.finalize_ms_max = metrics.finalize_ms_max.max(result.metrics.finalize_ms); + metrics.finalize_queue_wait_ms_max = metrics + .finalize_queue_wait_ms_max + .max(result.metrics.finalize_queue_wait_ms.unwrap_or(0)); + metrics.finalize_worker_ms_total += result.metrics.finalize_worker_ms; + metrics.finalize_worker_ms_max = metrics + .finalize_worker_ms_max + .max(result.metrics.finalize_worker_ms); + *finalize_inflight = finalize_inflight.saturating_sub(1); + finished.push(result.finished); + } + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + if *finalize_inflight == 0 { + break; + } + return Err(TreeRunError::Runner( + "phase2 finalize result channel disconnected".to_string(), + )); + } + } + } + metrics.duration_ms = elapsed_ms(started); + Ok(metrics) +} + +fn drain_finalize_results_with_progress( + finalize_result_rx: &Receiver, + finished: &mut Vec, + finalize_inflight: &mut usize, + pending_finalization_len: usize, + pending_roa_dispatch_len: usize, + inflight_publication_points_len: usize, +) -> Result<(), TreeRunError> { + let drain_metrics = drain_finalize_results(finalize_result_rx, finished, finalize_inflight)?; + if drain_metrics.results_drained >= 64 + || (drain_metrics.results_drained > 0 + && (*finalize_inflight == 0 || pending_finalization_len > 0)) + { + crate::progress_log::emit( + "phase2_finalize_results_drain", + serde_json::json!({ + "results_drained": drain_metrics.results_drained, + "reduce_ms_total": drain_metrics.reduce_ms_total, + "reduce_ms_max": drain_metrics.reduce_ms_max, + "finalize_ms_total": drain_metrics.finalize_ms_total, + "finalize_ms_max": drain_metrics.finalize_ms_max, + "finalize_queue_wait_ms_max": drain_metrics.finalize_queue_wait_ms_max, + "finalize_worker_ms_total": drain_metrics.finalize_worker_ms_total, + "finalize_worker_ms_max": drain_metrics.finalize_worker_ms_max, + "duration_ms": drain_metrics.duration_ms, + "pending_finalization_len": pending_finalization_len, + "finalize_inflight": *finalize_inflight, + "pending_roa_dispatch_len": pending_roa_dispatch_len, + "inflight_publication_points": inflight_publication_points_len, + }), + ); + } + Ok(()) +} + +fn run_finalize_worker( + runner: &Rpkiv1PublicationPointRunner<'_>, + finalize_task_rx: Receiver, + finalize_result_tx: mpsc::Sender, + compact_audit: bool, +) -> Result<(), TreeRunError> { + while let Ok(task) = finalize_task_rx.recv() { + let result = finalize_publication_point_state(runner, task.state, compact_audit); + if finalize_result_tx.send(result).is_err() { + return Err(TreeRunError::Runner( + "phase2 finalize result receiver disconnected".to_string(), + )); + } + } + Ok(()) +} + +fn finalize_publication_point_state( + runner: &Rpkiv1PublicationPointRunner<'_>, + state: InflightPublicationPoint, + compact_audit: bool, +) -> FinalizeWorkerResult { + let finalize_worker_started = Instant::now(); + let InflightPublicationPoint { + node, + fresh_stage, + objects_stage, + repo_outcome, + warnings, + started_at, + objects_started_at, + task_count, + tasks_submitted, + first_task_submitted_at, + last_task_submitted_at, + first_result_at, + last_result_at, + worker_ms_total, + worker_ms_max, + queue_wait_ms_total, + queue_wait_ms_max, + results, + } = state; + let finalize_queue_wait_ms = last_result_at.map(|last_result| { + Instant::now() + .saturating_duration_since(last_result) + .as_millis() as u64 + }); + let objects_processing_ms = objects_started_at.elapsed().as_millis() as u64; + let reduce_started = Instant::now(); + let reduce_result = reduce_parallel_roa_stage(objects_stage, results, runner.timing.as_ref()); + let reduce_ms = elapsed_ms(reduce_started); + + let (result, finalize_ms) = match reduce_result { + Ok(mut objects) => { + let finalize_started = Instant::now(); + objects + .router_keys + .extend(fresh_stage.discovered_router_keys.clone()); + objects.local_outputs_cache.extend( + crate::validation::tree_runner::build_router_key_local_outputs( + &node.handle, + &objects.router_keys, + ), + ); + let result = runner + .finalize_fresh_publication_point_from_reducer( + &node.handle, + &fresh_stage.fresh_point, + warnings, + objects, + fresh_stage.child_audits, + fresh_stage.discovered_children, + repo_outcome.repo_sync_source.as_deref(), + repo_outcome.repo_sync_phase.as_deref(), + repo_outcome.repo_sync_duration_ms, + repo_outcome.repo_sync_err.as_deref(), + ) + .map(|out| out.result); + ( + compact_phase2_finished_result_result(result, compact_audit), + elapsed_ms(finalize_started), + ) + } + Err(err) => (Err(err), 0), + }; + let finalize_worker_ms = elapsed_ms(finalize_worker_started); + crate::progress_log::emit( + "phase2_publication_point_reduced", + serde_json::json!({ + "manifest_rsync_uri": node.handle.manifest_rsync_uri.as_str(), + "publication_point_rsync_uri": node.handle.publication_point_rsync_uri.as_str(), + "objects_processing_ms": objects_processing_ms, + "task_count": task_count, + "tasks_submitted": tasks_submitted, + "first_task_submitted_ms": first_task_submitted_at.map(|t| t.saturating_duration_since(objects_started_at).as_millis() as u64), + "last_task_submitted_ms": last_task_submitted_at.map(|t| t.saturating_duration_since(objects_started_at).as_millis() as u64), + "task_submit_span_ms": match (first_task_submitted_at, last_task_submitted_at) { + (Some(first), Some(last)) => Some(last.saturating_duration_since(first).as_millis() as u64), + _ => None, + }, + "first_result_ms": first_result_at.map(|t| t.saturating_duration_since(objects_started_at).as_millis() as u64), + "last_result_ms": last_result_at.map(|t| t.saturating_duration_since(objects_started_at).as_millis() as u64), + "all_results_ready_ms": last_result_at.map(|t| t.saturating_duration_since(objects_started_at).as_millis() as u64), + "finalize_queue_wait_ms": finalize_queue_wait_ms, + "result_span_ms": match (first_result_at, last_result_at) { + (Some(first), Some(last)) => Some(last.saturating_duration_since(first).as_millis() as u64), + _ => None, + }, + "worker_ms_total": worker_ms_total, + "worker_ms_max": worker_ms_max, + "worker_ms_avg": if task_count > 0 { worker_ms_total / task_count as u64 } else { 0 }, + "queue_wait_ms_total": queue_wait_ms_total, + "queue_wait_ms_max": queue_wait_ms_max, + "queue_wait_ms_avg": if task_count > 0 { queue_wait_ms_total / task_count as u64 } else { 0 }, + "reduce_ms": reduce_ms, + "finalize_ms": finalize_ms, + "finalize_worker_ms": finalize_worker_ms, + "total_duration_ms": started_at.elapsed().as_millis() as u64, + }), + ); + FinalizeWorkerResult { + finished: FinishedPublicationPoint { node, result }, + metrics: FinalizePublicationPointMetrics { + reduce_ms, + finalize_ms, + finalize_queue_wait_ms, + finalize_worker_ms, + }, + } +} + fn drain_repo_events( repo_runtime: &dyn crate::parallel::repo_runtime::RepoSyncRuntime, ca_waiting_repo_by_identity: &mut HashMap>, @@ -1092,15 +1488,20 @@ fn event_poll_timeout( ready_queue: &VecDeque, pending_roa_dispatch: &VecDeque, inflight_publication_points: &HashMap, + pending_finalization: &VecDeque, + finalize_inflight: usize, instances_started: usize, config: &TreeRunConfig, ) -> Duration { if !ready_queue.is_empty() || !pending_roa_dispatch.is_empty() || !inflight_publication_points.is_empty() + || !pending_finalization.is_empty() || (!ca_queue.is_empty() && can_start_more(instances_started, config)) { Duration::from_millis(0) + } else if finalize_inflight > 0 { + Duration::from_millis(10) } else { Duration::from_millis(50) } @@ -1112,6 +1513,8 @@ fn is_complete( ca_waiting_repo_by_identity: &HashMap>, pending_roa_dispatch: &VecDeque, inflight_publication_points: &HashMap, + pending_finalization: &VecDeque, + finalize_inflight: usize, instances_started: usize, config: &TreeRunConfig, ) -> bool { @@ -1121,6 +1524,8 @@ fn is_complete( && ca_waiting_repo_by_identity.is_empty() && pending_roa_dispatch.is_empty() && inflight_publication_points.is_empty() + && pending_finalization.is_empty() + && finalize_inflight == 0 } fn build_tree_output(mut finished: Vec) -> TreeRunAuditOutput { diff --git a/tests/test_objects_process_publication_point_snapshot.rs b/tests/test_objects_process_publication_point_snapshot.rs index 8e76087..1d99e84 100644 --- a/tests/test_objects_process_publication_point_snapshot.rs +++ b/tests/test_objects_process_publication_point_snapshot.rs @@ -192,6 +192,7 @@ fn parallel_roa_processing_matches_serial_for_real_cernet_fixture() { &ParallelPhase2Config { object_workers: 2, worker_queue_capacity: 4, + ..ParallelPhase2Config::default() }, ); @@ -238,6 +239,7 @@ fn parallel_roa_processing_reports_issuer_decode_failure_like_serial() { &ParallelPhase2Config { object_workers: 2, worker_queue_capacity: 4, + ..ParallelPhase2Config::default() }, ); @@ -287,6 +289,7 @@ fn parallel_roa_processing_reports_missing_crl_like_serial() { &ParallelPhase2Config { object_workers: 2, worker_queue_capacity: 4, + ..ParallelPhase2Config::default() }, ); diff --git a/tests/test_objects_processing_coverage_m18.rs b/tests/test_objects_processing_coverage_m18.rs index 3367456..55e65ce 100644 --- a/tests/test_objects_processing_coverage_m18.rs +++ b/tests/test_objects_processing_coverage_m18.rs @@ -559,6 +559,7 @@ fn parallel_roa_processing_drop_object_records_roa_and_aspa_errors_like_serial() let phase2_config = ParallelPhase2Config { object_workers: 2, worker_queue_capacity: 1, + ..ParallelPhase2Config::default() }; let pool = ParallelRoaWorkerPool::new(&phase2_config).expect("parallel roa pool"); let parallel = process_publication_point_for_issuer_parallel_roa_with_pool( @@ -637,6 +638,7 @@ fn parallel_roa_processing_falls_back_for_drop_publication_point_policy() { &ParallelPhase2Config { object_workers: 2, worker_queue_capacity: 1, + ..ParallelPhase2Config::default() }, ); @@ -689,6 +691,7 @@ fn parallel_roa_processing_falls_back_for_single_worker_config() { &ParallelPhase2Config { object_workers: 1, worker_queue_capacity: 1, + ..ParallelPhase2Config::default() }, ); @@ -740,6 +743,7 @@ fn parallel_roa_processing_falls_back_when_pool_creation_fails() { &ParallelPhase2Config { object_workers: 2, worker_queue_capacity: 0, + ..ParallelPhase2Config::default() }, );