rpki/src/validation/tree_parallel.rs

2421 lines
103 KiB
Rust

use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::mpsc::{self, Receiver, SyncSender, TryRecvError, TrySendError};
use std::time::{Duration, Instant};
use crate::audit::{DiscoveredFrom, PublicationPointAudit};
use crate::cir::CirInputAccumulator;
use crate::parallel::object_worker::ObjectWorkerSubmitError;
use crate::parallel::repo_runtime::{RepoSyncRequestStatus, RepoSyncRuntimeOutcome};
use crate::parallel::types::RepoIdentity;
use crate::policy::SignedObjectFailurePolicy;
use crate::report::Warning;
use crate::storage::VcirReplaceTimingBreakdown;
use crate::validation::manifest::PublicationPointData;
use crate::validation::manifest::PublicationPointSource;
use crate::validation::objects::{
ObjectsOutput, OwnedRoaTask, ParallelObjectsPrepare, ParallelObjectsStage,
RoaValidationCacheInput, prepare_publication_point_for_parallel_roa_with_cache,
reduce_parallel_roa_stage,
};
use crate::validation::tree::{
CaInstanceHandle, DiscoveredChildCaInstance, PublicationPointRunResult, PublicationPointRunner,
TreeRunAuditOutput, TreeRunConfig, TreeRunError, TreeRunOutput,
run_tree_serial_audit_multi_root,
};
use crate::validation::tree_runner::{
FreshPublicationPointFinalizeOutput, FreshPublicationPointStage, Rpkiv1PublicationPointRunner,
};
#[derive(Clone, Debug)]
struct QueuedCaInstance {
id: u64,
handle: CaInstanceHandle,
parent_id: Option<u64>,
discovered_from: Option<DiscoveredFrom>,
}
#[derive(Clone, Debug)]
struct ReadyCaInstance {
node: QueuedCaInstance,
repo_outcome: RepoSyncRuntimeOutcome,
ready_enqueued_at: Instant,
}
struct InflightPublicationPoint {
node: QueuedCaInstance,
fresh_stage: FreshPublicationPointStage,
objects_stage: ParallelObjectsStage,
repo_outcome: RepoSyncRuntimeOutcome,
warnings: Vec<Warning>,
started_at: Instant,
objects_started_at: Instant,
task_count: usize,
tasks_submitted: usize,
first_task_submitted_at: Option<Instant>,
last_task_submitted_at: Option<Instant>,
first_result_at: Option<Instant>,
last_result_at: Option<Instant>,
worker_ms_total: u64,
worker_ms_max: u64,
queue_wait_ms_total: u64,
queue_wait_ms_max: u64,
finalize_enqueued_at: Option<Instant>,
results: Vec<crate::validation::objects::RoaTaskResult>,
}
struct FinishedPublicationPoint {
node: FinishedPublicationPointNode,
result: FinishedPublicationPointResult,
}
#[derive(Clone, Debug)]
struct FinishedPublicationPointNode {
id: u64,
parent_id: Option<u64>,
discovered_from: Option<DiscoveredFrom>,
manifest_rsync_uri: String,
}
impl FinishedPublicationPointNode {
fn from_queued(node: QueuedCaInstance) -> Self {
Self {
id: node.id,
parent_id: node.parent_id,
discovered_from: node.discovered_from,
manifest_rsync_uri: node.handle.manifest_rsync_uri,
}
}
}
#[derive(Debug)]
enum FinishedPublicationPointResult {
Ok {
source: PublicationPointSource,
warnings: Vec<Warning>,
objects: ObjectsOutput,
audit: PublicationPointAudit,
cir_fresh_objects: Vec<crate::audit::ObjectAuditEntry>,
cir_cached_objects: Vec<crate::audit::ObjectAuditEntry>,
},
Err(String),
}
struct FinalizeTask {
state: InflightPublicationPoint,
}
struct FinalizeWorkerResult {
finished: FinishedPublicationPoint,
metrics: FinalizePublicationPointMetrics,
}
#[derive(Default)]
struct ReadyStageMetrics {
manifest_rsync_uri: Option<String>,
publication_point_rsync_uri: Option<String>,
ready_count: usize,
fallback_count: usize,
complete_count: usize,
staged_count: usize,
zero_task_count: usize,
error_count: usize,
discovered_children: usize,
locked_files: usize,
roa_tasks: usize,
aspa_objects: usize,
stage_fresh_ms: u64,
snapshot_prepare_ms: u64,
snapshot_current_index_lock_ms: u64,
snapshot_manifest_load_ms: u64,
snapshot_manifest_index_lookup_ms: u64,
snapshot_manifest_blob_load_ms: u64,
snapshot_manifest_decode_ms: u64,
snapshot_replay_guard_ms: u64,
replay_meta_hit_count: usize,
replay_meta_miss_count: usize,
snapshot_manifest_entries_ms: u64,
snapshot_pack_files_ms: u64,
snapshot_pack_files_index_lookup_ms: u64,
snapshot_pack_files_blob_load_ms: u64,
snapshot_ee_path_validate_ms: u64,
snapshot_manifest_file_count: usize,
child_discovery_ms: u64,
child_enqueue_ms: u64,
ready_queue_wait_ms: u64,
ready_queue_len_after_pop: usize,
roa_presence_scan_ms: u64,
roa_cache_view_ms: u64,
direct_finalize_ms: u64,
fallback_full_run_ms: u64,
prepare_ms: u64,
build_roa_tasks_ms: u64,
total_ms: u64,
}
#[derive(Default)]
struct ReadyStageBatchMetrics {
ready_count: usize,
fallback_count: usize,
complete_count: usize,
staged_count: usize,
zero_task_count: usize,
error_count: usize,
discovered_children: usize,
locked_files: usize,
roa_tasks: usize,
aspa_objects: usize,
stage_fresh_ms_total: u64,
stage_fresh_ms_max: u64,
stage_fresh_ms_max_manifest_rsync_uri: Option<String>,
stage_fresh_ms_max_publication_point_rsync_uri: Option<String>,
snapshot_prepare_ms_total: u64,
snapshot_prepare_ms_max: u64,
snapshot_current_index_lock_ms_total: u64,
snapshot_current_index_lock_ms_max: u64,
snapshot_manifest_load_ms_total: u64,
snapshot_manifest_load_ms_max: u64,
snapshot_manifest_index_lookup_ms_total: u64,
snapshot_manifest_index_lookup_ms_max: u64,
snapshot_manifest_blob_load_ms_total: u64,
snapshot_manifest_blob_load_ms_max: u64,
snapshot_manifest_decode_ms_total: u64,
snapshot_manifest_decode_ms_max: u64,
snapshot_replay_guard_ms_total: u64,
snapshot_replay_guard_ms_max: u64,
replay_meta_hit_count: usize,
replay_meta_miss_count: usize,
snapshot_manifest_entries_ms_total: u64,
snapshot_manifest_entries_ms_max: u64,
snapshot_pack_files_ms_total: u64,
snapshot_pack_files_ms_max: u64,
snapshot_pack_files_index_lookup_ms_total: u64,
snapshot_pack_files_index_lookup_ms_max: u64,
snapshot_pack_files_blob_load_ms_total: u64,
snapshot_pack_files_blob_load_ms_max: u64,
snapshot_ee_path_validate_ms_total: u64,
snapshot_ee_path_validate_ms_max: u64,
snapshot_manifest_file_count_total: usize,
snapshot_manifest_file_count_max: usize,
child_discovery_ms_total: u64,
child_discovery_ms_max: u64,
child_enqueue_ms_total: u64,
child_enqueue_ms_max: u64,
ready_queue_wait_ms_total: u64,
ready_queue_wait_ms_max: u64,
roa_presence_scan_ms_total: u64,
roa_presence_scan_ms_max: u64,
roa_cache_view_ms_total: u64,
roa_cache_view_ms_max: u64,
direct_finalize_ms_total: u64,
direct_finalize_ms_max: u64,
fallback_full_run_ms_total: u64,
fallback_full_run_ms_max: u64,
prepare_ms_total: u64,
prepare_ms_max: u64,
build_roa_tasks_ms_total: u64,
build_roa_tasks_ms_max: u64,
total_ms: u64,
}
impl ReadyStageBatchMetrics {
fn record(&mut self, metrics: ReadyStageMetrics) {
self.ready_count += metrics.ready_count;
self.fallback_count += metrics.fallback_count;
self.complete_count += metrics.complete_count;
self.staged_count += metrics.staged_count;
self.zero_task_count += metrics.zero_task_count;
self.error_count += metrics.error_count;
self.discovered_children += metrics.discovered_children;
self.locked_files += metrics.locked_files;
self.roa_tasks += metrics.roa_tasks;
self.aspa_objects += metrics.aspa_objects;
if metrics.stage_fresh_ms >= self.stage_fresh_ms_max {
self.stage_fresh_ms_max_manifest_rsync_uri = metrics.manifest_rsync_uri.clone();
self.stage_fresh_ms_max_publication_point_rsync_uri =
metrics.publication_point_rsync_uri.clone();
}
self.stage_fresh_ms_total += metrics.stage_fresh_ms;
self.stage_fresh_ms_max = self.stage_fresh_ms_max.max(metrics.stage_fresh_ms);
self.snapshot_prepare_ms_total += metrics.snapshot_prepare_ms;
self.snapshot_prepare_ms_max = self
.snapshot_prepare_ms_max
.max(metrics.snapshot_prepare_ms);
self.snapshot_current_index_lock_ms_total += metrics.snapshot_current_index_lock_ms;
self.snapshot_current_index_lock_ms_max = self
.snapshot_current_index_lock_ms_max
.max(metrics.snapshot_current_index_lock_ms);
self.snapshot_manifest_load_ms_total += metrics.snapshot_manifest_load_ms;
self.snapshot_manifest_load_ms_max = self
.snapshot_manifest_load_ms_max
.max(metrics.snapshot_manifest_load_ms);
self.snapshot_manifest_index_lookup_ms_total += metrics.snapshot_manifest_index_lookup_ms;
self.snapshot_manifest_index_lookup_ms_max = self
.snapshot_manifest_index_lookup_ms_max
.max(metrics.snapshot_manifest_index_lookup_ms);
self.snapshot_manifest_blob_load_ms_total += metrics.snapshot_manifest_blob_load_ms;
self.snapshot_manifest_blob_load_ms_max = self
.snapshot_manifest_blob_load_ms_max
.max(metrics.snapshot_manifest_blob_load_ms);
self.snapshot_manifest_decode_ms_total += metrics.snapshot_manifest_decode_ms;
self.snapshot_manifest_decode_ms_max = self
.snapshot_manifest_decode_ms_max
.max(metrics.snapshot_manifest_decode_ms);
self.snapshot_replay_guard_ms_total += metrics.snapshot_replay_guard_ms;
self.snapshot_replay_guard_ms_max = self
.snapshot_replay_guard_ms_max
.max(metrics.snapshot_replay_guard_ms);
self.replay_meta_hit_count += metrics.replay_meta_hit_count;
self.replay_meta_miss_count += metrics.replay_meta_miss_count;
self.snapshot_manifest_entries_ms_total += metrics.snapshot_manifest_entries_ms;
self.snapshot_manifest_entries_ms_max = self
.snapshot_manifest_entries_ms_max
.max(metrics.snapshot_manifest_entries_ms);
self.snapshot_pack_files_ms_total += metrics.snapshot_pack_files_ms;
self.snapshot_pack_files_ms_max = self
.snapshot_pack_files_ms_max
.max(metrics.snapshot_pack_files_ms);
self.snapshot_pack_files_index_lookup_ms_total +=
metrics.snapshot_pack_files_index_lookup_ms;
self.snapshot_pack_files_index_lookup_ms_max = self
.snapshot_pack_files_index_lookup_ms_max
.max(metrics.snapshot_pack_files_index_lookup_ms);
self.snapshot_pack_files_blob_load_ms_total += metrics.snapshot_pack_files_blob_load_ms;
self.snapshot_pack_files_blob_load_ms_max = self
.snapshot_pack_files_blob_load_ms_max
.max(metrics.snapshot_pack_files_blob_load_ms);
self.snapshot_ee_path_validate_ms_total += metrics.snapshot_ee_path_validate_ms;
self.snapshot_ee_path_validate_ms_max = self
.snapshot_ee_path_validate_ms_max
.max(metrics.snapshot_ee_path_validate_ms);
self.snapshot_manifest_file_count_total += metrics.snapshot_manifest_file_count;
self.snapshot_manifest_file_count_max = self
.snapshot_manifest_file_count_max
.max(metrics.snapshot_manifest_file_count);
self.child_discovery_ms_total += metrics.child_discovery_ms;
self.child_discovery_ms_max = self.child_discovery_ms_max.max(metrics.child_discovery_ms);
self.child_enqueue_ms_total += metrics.child_enqueue_ms;
self.child_enqueue_ms_max = self.child_enqueue_ms_max.max(metrics.child_enqueue_ms);
self.ready_queue_wait_ms_total += metrics.ready_queue_wait_ms;
self.ready_queue_wait_ms_max = self
.ready_queue_wait_ms_max
.max(metrics.ready_queue_wait_ms);
self.roa_presence_scan_ms_total += metrics.roa_presence_scan_ms;
self.roa_presence_scan_ms_max = self
.roa_presence_scan_ms_max
.max(metrics.roa_presence_scan_ms);
self.roa_cache_view_ms_total += metrics.roa_cache_view_ms;
self.roa_cache_view_ms_max = self.roa_cache_view_ms_max.max(metrics.roa_cache_view_ms);
self.direct_finalize_ms_total += metrics.direct_finalize_ms;
self.direct_finalize_ms_max = self.direct_finalize_ms_max.max(metrics.direct_finalize_ms);
self.fallback_full_run_ms_total += metrics.fallback_full_run_ms;
self.fallback_full_run_ms_max = self
.fallback_full_run_ms_max
.max(metrics.fallback_full_run_ms);
self.prepare_ms_total += metrics.prepare_ms;
self.prepare_ms_max = self.prepare_ms_max.max(metrics.prepare_ms);
self.build_roa_tasks_ms_total += metrics.build_roa_tasks_ms;
self.build_roa_tasks_ms_max = self.build_roa_tasks_ms_max.max(metrics.build_roa_tasks_ms);
self.total_ms += metrics.total_ms;
}
}
#[derive(Default)]
struct RoaDispatchMetrics {
attempted: usize,
submitted: usize,
queue_full: bool,
pending_remaining: usize,
duration_ms: u64,
}
#[derive(Default)]
struct ObjectDrainMetrics {
results_drained: usize,
publication_points_completed: usize,
worker_ms_total: u64,
worker_ms_max: u64,
queue_wait_ms_total: u64,
queue_wait_ms_max: u64,
result_budget_exhausted: bool,
duration_ms: u64,
}
#[derive(Default)]
struct FinalizeSubmitMetrics {
submitted: usize,
queue_full: bool,
duration_ms: u64,
}
#[derive(Default)]
struct FinalizePublicationPointMetrics {
reduce_ms: u64,
finalize_ms: u64,
finalize_queue_wait_ms: Option<u64>,
finalize_worker_ms: u64,
snapshot_pack_ms: u64,
persist_vcir_ms: u64,
persist_build_vcir_ms: u64,
persist_replace_vcir_ms: u64,
persist_replace_breakdown: VcirReplaceTimingBreakdown,
ccr_projection_build_ms: u64,
ccr_append_ms: u64,
audit_build_ms: u64,
locked_files: usize,
child_count: usize,
warning_count: usize,
vrp_count: usize,
vap_count: usize,
router_key_count: usize,
audit_object_count: usize,
}
#[derive(Default)]
struct FinalizeResultsDrainMetrics {
results_drained: usize,
reduce_ms_total: u64,
reduce_ms_max: u64,
finalize_ms_total: u64,
finalize_ms_max: u64,
finalize_queue_wait_ms_max: u64,
finalize_worker_ms_total: u64,
finalize_worker_ms_max: u64,
snapshot_pack_ms_total: u64,
snapshot_pack_ms_max: u64,
persist_vcir_ms_total: u64,
persist_vcir_ms_max: u64,
persist_build_vcir_ms_total: u64,
persist_build_vcir_ms_max: u64,
persist_replace_vcir_ms_total: u64,
persist_replace_vcir_ms_max: u64,
ccr_projection_build_ms_total: u64,
ccr_projection_build_ms_max: u64,
ccr_append_ms_total: u64,
ccr_append_ms_max: u64,
audit_build_ms_total: u64,
audit_build_ms_max: u64,
duration_ms: u64,
}
#[derive(Default)]
struct RepoDrainMetrics {
event_count: usize,
completions: usize,
ready_enqueued: usize,
duration_ms: u64,
}
fn elapsed_ms(started: Instant) -> u64 {
started.elapsed().as_millis() as u64
}
fn emit_control_loop_slow(
duration_ms: u64,
repo_poll_timeout: Duration,
repo_metrics: &RepoDrainMetrics,
ready_batch_metrics: &ReadyStageBatchMetrics,
ca_queue_len: usize,
ready_queue_len: usize,
ca_waiting_repo_identities: usize,
pending_roa_dispatch_len: usize,
inflight_publication_points_len: usize,
pending_finalization_len: usize,
finalize_inflight: usize,
) {
let threshold_ms = crate::progress_log::control_loop_slow_threshold_ms();
if duration_ms < threshold_ms {
return;
}
crate::progress_log::emit(
"phase2_control_loop_slow",
serde_json::json!({
"duration_ms": duration_ms,
"slow_threshold_ms": threshold_ms,
"repo_poll_timeout_ms": repo_poll_timeout.as_millis() as u64,
"repo_event_count": repo_metrics.event_count,
"repo_completions": repo_metrics.completions,
"repo_ready_enqueued": repo_metrics.ready_enqueued,
"repo_drain_duration_ms": repo_metrics.duration_ms,
"ready_count": ready_batch_metrics.ready_count,
"ready_batch_duration_ms": ready_batch_metrics.total_ms,
"ready_batch_stage_fresh_ms_total": ready_batch_metrics.stage_fresh_ms_total,
"ready_batch_stage_fresh_ms_max": ready_batch_metrics.stage_fresh_ms_max,
"ready_batch_stage_fresh_ms_max_manifest_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_manifest_rsync_uri,
"ready_batch_child_discovery_ms_total": ready_batch_metrics.child_discovery_ms_total,
"ready_batch_child_discovery_ms_max": ready_batch_metrics.child_discovery_ms_max,
"ready_batch_prepare_ms_total": ready_batch_metrics.prepare_ms_total,
"ready_batch_prepare_ms_max": ready_batch_metrics.prepare_ms_max,
"ready_batch_direct_finalize_ms_total": ready_batch_metrics.direct_finalize_ms_total,
"ready_batch_direct_finalize_ms_max": ready_batch_metrics.direct_finalize_ms_max,
"ca_queue_len": ca_queue_len,
"ready_queue_len": ready_queue_len,
"ca_waiting_repo_identities": ca_waiting_repo_identities,
"pending_roa_dispatch_len": pending_roa_dispatch_len,
"inflight_publication_points_len": inflight_publication_points_len,
"pending_finalization_len": pending_finalization_len,
"finalize_inflight": finalize_inflight,
}),
);
}
fn compact_phase2_finished_result(
mut result: PublicationPointRunResult,
compact_audit: bool,
) -> FinishedPublicationPointResult {
result.objects.audit.clear();
result.objects.local_outputs_cache.clear();
let cir_fresh_objects = if compact_audit && result.source == PublicationPointSource::Fresh {
result.audit.objects.clone()
} else {
result.cir_fresh_objects
};
if compact_audit {
result.audit.objects.clear();
result.audit.warnings.clear();
}
FinishedPublicationPointResult::Ok {
source: result.source,
warnings: result.warnings,
objects: result.objects,
audit: result.audit,
cir_fresh_objects,
cir_cached_objects: result.cir_cached_objects,
}
}
fn compact_phase2_finished_result_result(
result: Result<PublicationPointRunResult, String>,
compact_audit: bool,
) -> FinishedPublicationPointResult {
match result {
Ok(result) => compact_phase2_finished_result(result, compact_audit),
Err(err) => FinishedPublicationPointResult::Err(err),
}
}
pub fn run_tree_parallel_phase2_audit_multi_root(
roots: Vec<CaInstanceHandle>,
runner: &Rpkiv1PublicationPointRunner<'_>,
config: &TreeRunConfig,
) -> Result<TreeRunAuditOutput, TreeRunError> {
if runner.policy.signed_object_failure_policy == SignedObjectFailurePolicy::DropPublicationPoint
{
return run_tree_serial_audit_multi_root(roots, runner, config);
}
let Some(repo_runtime) = runner.repo_sync_runtime.as_ref() else {
return run_tree_serial_audit_multi_root(roots, runner, config);
};
if runner.parallel_roa_worker_pool.is_none() {
return run_tree_serial_audit_multi_root(roots, runner, config);
}
let mut next_id: u64 = 0;
let mut ca_queue: VecDeque<QueuedCaInstance> = VecDeque::new();
for root in roots {
ca_queue.push_back(QueuedCaInstance {
id: next_id,
handle: root,
parent_id: None,
discovered_from: None,
});
next_id += 1;
}
let mut visited_manifest_uris: HashSet<String> = HashSet::new();
let mut ca_waiting_repo_by_identity: HashMap<RepoIdentity, Vec<QueuedCaInstance>> =
HashMap::new();
let mut ready_queue: VecDeque<ReadyCaInstance> = VecDeque::new();
let mut inflight_publication_points: HashMap<u64, InflightPublicationPoint> = HashMap::new();
let mut pending_finalization: VecDeque<FinalizeTask> = VecDeque::new();
let mut pending_roa_dispatch: VecDeque<OwnedRoaTask> = VecDeque::new();
let mut finished: Vec<FinishedPublicationPoint> = Vec::new();
let mut instances_started = 0usize;
let phase2_config = runner.parallel_phase2_config.as_ref();
let ready_batch_size = phase2_config
.map(|cfg| cfg.ready_batch_size)
.unwrap_or(256)
.max(1);
let ready_batch_wall_time_budget_ms = phase2_config
.map(|cfg| cfg.ready_batch_wall_time_budget_ms)
.unwrap_or(100)
.max(1);
let ready_batch_wall_time_budget = Duration::from_millis(ready_batch_wall_time_budget_ms);
let object_result_drain_batch_size = phase2_config
.map(|cfg| cfg.object_result_drain_batch_size)
.unwrap_or(2048)
.max(1);
let publication_point_finalize_queue_capacity = phase2_config
.map(|cfg| cfg.publication_point_finalize_queue_capacity)
.unwrap_or(32768)
.max(1);
let (finalize_task_tx, finalize_task_rx) =
mpsc::sync_channel::<FinalizeTask>(publication_point_finalize_queue_capacity);
let (finalize_result_tx, finalize_result_rx) = mpsc::channel::<FinalizeWorkerResult>();
let mut finalize_inflight = 0usize;
return std::thread::scope(|scope| {
let finalize_worker = scope.spawn(move || {
run_finalize_worker(
runner,
finalize_task_rx,
finalize_result_tx,
config.compact_audit,
)
});
let run_result: Result<(), TreeRunError> = (|| {
loop {
let control_loop_started = Instant::now();
drain_finalize_results_with_progress(
&finalize_result_rx,
&mut finished,
&mut finalize_inflight,
pending_finalization.len(),
pending_roa_dispatch.len(),
inflight_publication_points.len(),
)?;
flush_pending_roa_dispatch_with_progress(
runner,
&mut pending_roa_dispatch,
&mut inflight_publication_points,
&pending_finalization,
)?;
drain_object_results_with_progress(
runner,
&mut inflight_publication_points,
&mut pending_finalization,
pending_roa_dispatch.len(),
object_result_drain_batch_size,
)?;
submit_pending_finalization_with_progress(
&finalize_task_tx,
&mut pending_finalization,
&mut finalize_inflight,
publication_point_finalize_queue_capacity,
pending_roa_dispatch.len(),
inflight_publication_points.len(),
)?;
start_queued_ca_instances(
repo_runtime.as_ref(),
&mut ca_queue,
&mut ready_queue,
&mut ca_waiting_repo_by_identity,
&mut finished,
&mut visited_manifest_uris,
&mut instances_started,
config,
);
let repo_poll_timeout = event_poll_timeout(
&ca_queue,
&ready_queue,
&pending_roa_dispatch,
&inflight_publication_points,
&pending_finalization,
finalize_inflight,
instances_started,
config,
);
let repo_metrics = drain_repo_events(
repo_runtime.as_ref(),
&mut ca_waiting_repo_by_identity,
&mut ready_queue,
repo_poll_timeout,
)?;
if repo_metrics.event_count > 0 {
crate::progress_log::emit(
"phase2_repo_events_drain",
serde_json::json!({
"event_count": repo_metrics.event_count,
"completions": repo_metrics.completions,
"ready_enqueued": repo_metrics.ready_enqueued,
"duration_ms": repo_metrics.duration_ms,
"ready_queue_len": ready_queue.len(),
"ca_waiting_repo_identities": ca_waiting_repo_by_identity.len(),
}),
);
}
let ready_batch_started = Instant::now();
let mut ready_batch_metrics = ReadyStageBatchMetrics::default();
let mut ready_time_budget_exhausted = false;
while ready_batch_metrics.ready_count < ready_batch_size {
let Some(ready) = ready_queue.pop_front() else {
break;
};
let ready_queue_len_after_pop = ready_queue.len();
let metrics = stage_ready_publication_point(
runner,
&mut next_id,
&mut ca_queue,
&mut pending_roa_dispatch,
&mut inflight_publication_points,
&mut pending_finalization,
&mut finished,
ready,
ready_queue_len_after_pop,
config.compact_audit,
);
ready_batch_metrics.record(metrics);
if ready_batch_metrics.ready_count > 0
&& ready_batch_started.elapsed() >= ready_batch_wall_time_budget
{
ready_time_budget_exhausted = !ready_queue.is_empty();
break;
}
}
if ready_batch_metrics.ready_count > 0 {
ready_batch_metrics.total_ms = elapsed_ms(ready_batch_started);
let ready_count_budget_exhausted = ready_batch_metrics.ready_count
>= ready_batch_size
&& !ready_queue.is_empty();
ready_time_budget_exhausted = ready_time_budget_exhausted
|| (!ready_queue.is_empty()
&& ready_batch_metrics.total_ms >= ready_batch_wall_time_budget_ms);
emit_ready_queue_batch_progress(
&ready_batch_metrics,
ready_batch_size,
ready_batch_wall_time_budget_ms,
ready_queue.len(),
ready_count_budget_exhausted,
ready_time_budget_exhausted,
ca_queue.len(),
pending_roa_dispatch.len(),
inflight_publication_points.len(),
pending_finalization.len(),
finalize_inflight,
);
crate::progress_log::emit(
"phase2_ready_queue_stage_fresh_breakdown",
serde_json::json!({
"ready_count": ready_batch_metrics.ready_count,
"stage_fresh_ms_total": ready_batch_metrics.stage_fresh_ms_total,
"stage_fresh_ms_max": ready_batch_metrics.stage_fresh_ms_max,
"stage_fresh_ms_max_manifest_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_manifest_rsync_uri,
"stage_fresh_ms_max_publication_point_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_publication_point_rsync_uri,
"snapshot_prepare_ms_total": ready_batch_metrics.snapshot_prepare_ms_total,
"snapshot_prepare_ms_max": ready_batch_metrics.snapshot_prepare_ms_max,
"snapshot_current_index_lock_ms_total": ready_batch_metrics.snapshot_current_index_lock_ms_total,
"snapshot_current_index_lock_ms_max": ready_batch_metrics.snapshot_current_index_lock_ms_max,
"snapshot_manifest_load_ms_total": ready_batch_metrics.snapshot_manifest_load_ms_total,
"snapshot_manifest_load_ms_max": ready_batch_metrics.snapshot_manifest_load_ms_max,
"snapshot_manifest_index_lookup_ms_total": ready_batch_metrics.snapshot_manifest_index_lookup_ms_total,
"snapshot_manifest_index_lookup_ms_max": ready_batch_metrics.snapshot_manifest_index_lookup_ms_max,
"snapshot_manifest_blob_load_ms_total": ready_batch_metrics.snapshot_manifest_blob_load_ms_total,
"snapshot_manifest_blob_load_ms_max": ready_batch_metrics.snapshot_manifest_blob_load_ms_max,
"snapshot_manifest_decode_ms_total": ready_batch_metrics.snapshot_manifest_decode_ms_total,
"snapshot_manifest_decode_ms_max": ready_batch_metrics.snapshot_manifest_decode_ms_max,
"snapshot_replay_guard_ms_total": ready_batch_metrics.snapshot_replay_guard_ms_total,
"snapshot_replay_guard_ms_max": ready_batch_metrics.snapshot_replay_guard_ms_max,
"replay_meta_hit_count": ready_batch_metrics.replay_meta_hit_count,
"replay_meta_miss_count": ready_batch_metrics.replay_meta_miss_count,
"snapshot_manifest_entries_ms_total": ready_batch_metrics.snapshot_manifest_entries_ms_total,
"snapshot_manifest_entries_ms_max": ready_batch_metrics.snapshot_manifest_entries_ms_max,
"snapshot_pack_files_ms_total": ready_batch_metrics.snapshot_pack_files_ms_total,
"snapshot_pack_files_ms_max": ready_batch_metrics.snapshot_pack_files_ms_max,
"snapshot_pack_files_index_lookup_ms_total": ready_batch_metrics.snapshot_pack_files_index_lookup_ms_total,
"snapshot_pack_files_index_lookup_ms_max": ready_batch_metrics.snapshot_pack_files_index_lookup_ms_max,
"snapshot_pack_files_blob_load_ms_total": ready_batch_metrics.snapshot_pack_files_blob_load_ms_total,
"snapshot_pack_files_blob_load_ms_max": ready_batch_metrics.snapshot_pack_files_blob_load_ms_max,
"snapshot_ee_path_validate_ms_total": ready_batch_metrics.snapshot_ee_path_validate_ms_total,
"snapshot_ee_path_validate_ms_max": ready_batch_metrics.snapshot_ee_path_validate_ms_max,
"snapshot_manifest_file_count_total": ready_batch_metrics.snapshot_manifest_file_count_total,
"snapshot_manifest_file_count_max": ready_batch_metrics.snapshot_manifest_file_count_max,
"child_discovery_ms_total": ready_batch_metrics.child_discovery_ms_total,
"child_discovery_ms_max": ready_batch_metrics.child_discovery_ms_max,
"batch_duration_ms": ready_batch_metrics.total_ms,
}),
);
}
flush_pending_roa_dispatch_with_progress(
runner,
&mut pending_roa_dispatch,
&mut inflight_publication_points,
&pending_finalization,
)?;
drain_object_results_with_progress(
runner,
&mut inflight_publication_points,
&mut pending_finalization,
pending_roa_dispatch.len(),
object_result_drain_batch_size,
)?;
submit_pending_finalization_with_progress(
&finalize_task_tx,
&mut pending_finalization,
&mut finalize_inflight,
publication_point_finalize_queue_capacity,
pending_roa_dispatch.len(),
inflight_publication_points.len(),
)?;
drain_finalize_results_with_progress(
&finalize_result_rx,
&mut finished,
&mut finalize_inflight,
pending_finalization.len(),
pending_roa_dispatch.len(),
inflight_publication_points.len(),
)?;
emit_control_loop_slow(
elapsed_ms(control_loop_started),
repo_poll_timeout,
&repo_metrics,
&ready_batch_metrics,
ca_queue.len(),
ready_queue.len(),
ca_waiting_repo_by_identity.len(),
pending_roa_dispatch.len(),
inflight_publication_points.len(),
pending_finalization.len(),
finalize_inflight,
);
if is_complete(
&ca_queue,
&ready_queue,
&ca_waiting_repo_by_identity,
&pending_roa_dispatch,
&inflight_publication_points,
&pending_finalization,
finalize_inflight,
instances_started,
config,
) {
break;
}
}
repo_runtime
.reset_run_state()
.map_err(TreeRunError::Runner)?;
Ok(())
})();
drop(finalize_task_tx);
let worker_result = finalize_worker
.join()
.map_err(|_| TreeRunError::Runner("phase2 finalize worker panicked".to_string()))?;
run_result?;
worker_result?;
drain_finalize_results_with_progress(
&finalize_result_rx,
&mut finished,
&mut finalize_inflight,
pending_finalization.len(),
pending_roa_dispatch.len(),
inflight_publication_points.len(),
)?;
if finalize_inflight != 0 || !pending_finalization.is_empty() {
return Err(TreeRunError::Runner(format!(
"phase2 finalize worker stopped with pending work: queued={} inflight={}",
pending_finalization.len(),
finalize_inflight
)));
}
Ok(build_tree_output(finished))
});
}
fn emit_ready_queue_batch_progress(
metrics: &ReadyStageBatchMetrics,
ready_batch_size: usize,
ready_batch_wall_time_budget_ms: u64,
ready_queue_len_after_batch: usize,
ready_count_budget_exhausted: bool,
ready_time_budget_exhausted: bool,
ca_queue_len_after_batch: usize,
pending_roa_dispatch_len_after_batch: usize,
inflight_publication_points_after_batch: usize,
pending_finalization_len_after_batch: usize,
finalize_inflight_after_batch: usize,
) {
crate::progress_log::emit(
"phase2_ready_queue_batch",
serde_json::json!({
"ready_count": metrics.ready_count,
"fallback_count": metrics.fallback_count,
"complete_count": metrics.complete_count,
"staged_count": metrics.staged_count,
"zero_task_count": metrics.zero_task_count,
"error_count": metrics.error_count,
"discovered_children": metrics.discovered_children,
"locked_files": metrics.locked_files,
"roa_tasks": metrics.roa_tasks,
"aspa_objects": metrics.aspa_objects,
"stage_fresh_ms_total": metrics.stage_fresh_ms_total,
"stage_fresh_ms_max": metrics.stage_fresh_ms_max,
"stage_fresh_ms_max_manifest_rsync_uri": metrics.stage_fresh_ms_max_manifest_rsync_uri,
"stage_fresh_ms_max_publication_point_rsync_uri": metrics.stage_fresh_ms_max_publication_point_rsync_uri,
"prepare_ms_total": metrics.prepare_ms_total,
"prepare_ms_max": metrics.prepare_ms_max,
"build_roa_tasks_ms_total": metrics.build_roa_tasks_ms_total,
"build_roa_tasks_ms_max": metrics.build_roa_tasks_ms_max,
"batch_duration_ms": metrics.total_ms,
"ready_batch_size": ready_batch_size,
"ready_batch_wall_time_budget_ms": ready_batch_wall_time_budget_ms,
"ready_queue_len_after_batch": ready_queue_len_after_batch,
"ready_queue_budget_exhausted": ready_queue_len_after_batch > 0,
"ready_count_budget_exhausted": ready_count_budget_exhausted,
"ready_time_budget_exhausted": ready_time_budget_exhausted,
"ca_queue_len_after_batch": ca_queue_len_after_batch,
"pending_roa_dispatch_len_after_batch": pending_roa_dispatch_len_after_batch,
"inflight_publication_points_after_batch": inflight_publication_points_after_batch,
"pending_finalization_len_after_batch": pending_finalization_len_after_batch,
"finalize_inflight_after_batch": finalize_inflight_after_batch,
}),
);
crate::progress_log::emit(
"phase2_ready_queue_control_breakdown",
serde_json::json!({
"ready_count": metrics.ready_count,
"ready_queue_wait_ms_total": metrics.ready_queue_wait_ms_total,
"ready_queue_wait_ms_max": metrics.ready_queue_wait_ms_max,
"child_enqueue_ms_total": metrics.child_enqueue_ms_total,
"child_enqueue_ms_max": metrics.child_enqueue_ms_max,
"roa_presence_scan_ms_total": metrics.roa_presence_scan_ms_total,
"roa_presence_scan_ms_max": metrics.roa_presence_scan_ms_max,
"roa_cache_view_ms_total": metrics.roa_cache_view_ms_total,
"roa_cache_view_ms_max": metrics.roa_cache_view_ms_max,
"direct_finalize_ms_total": metrics.direct_finalize_ms_total,
"direct_finalize_ms_max": metrics.direct_finalize_ms_max,
"fallback_full_run_ms_total": metrics.fallback_full_run_ms_total,
"fallback_full_run_ms_max": metrics.fallback_full_run_ms_max,
"batch_duration_ms": metrics.total_ms,
}),
);
}
fn can_start_more(instances_started: usize, config: &TreeRunConfig) -> bool {
config
.max_instances
.map(|max| instances_started < max)
.unwrap_or(true)
}
fn start_queued_ca_instances(
repo_runtime: &dyn crate::parallel::repo_runtime::RepoSyncRuntime,
ca_queue: &mut VecDeque<QueuedCaInstance>,
ready_queue: &mut VecDeque<ReadyCaInstance>,
ca_waiting_repo_by_identity: &mut HashMap<RepoIdentity, Vec<QueuedCaInstance>>,
finished: &mut Vec<FinishedPublicationPoint>,
visited_manifest_uris: &mut HashSet<String>,
instances_started: &mut usize,
config: &TreeRunConfig,
) {
while can_start_more(*instances_started, config) {
let Some(node) = ca_queue.pop_front() else {
break;
};
if !visited_manifest_uris.insert(node.handle.manifest_rsync_uri.clone()) {
continue;
}
if let Some(max_depth) = config.max_depth {
if node.handle.depth > max_depth {
continue;
}
}
*instances_started += 1;
match repo_runtime.request_publication_point_repo(&node.handle, 0) {
Ok(RepoSyncRequestStatus::Ready { mut outcome, .. }) => {
// Ready here means this CA is reusing repo work that has already completed
// (often due to child prefetch). Do not add the transport duration again.
outcome.repo_sync_duration_ms = 0;
ready_queue.push_back(ReadyCaInstance {
node,
repo_outcome: outcome,
ready_enqueued_at: Instant::now(),
});
}
Ok(RepoSyncRequestStatus::Pending { identity, .. }) => {
ca_waiting_repo_by_identity
.entry(identity)
.or_default()
.push(node);
}
Err(err) => {
finished.push(FinishedPublicationPoint {
node: FinishedPublicationPointNode::from_queued(node),
result: FinishedPublicationPointResult::Err(err),
});
}
}
}
}
fn stage_ready_publication_point(
runner: &Rpkiv1PublicationPointRunner<'_>,
next_id: &mut u64,
ca_queue: &mut VecDeque<QueuedCaInstance>,
pending_roa_dispatch: &mut VecDeque<OwnedRoaTask>,
inflight_publication_points: &mut HashMap<u64, InflightPublicationPoint>,
pending_finalization: &mut VecDeque<FinalizeTask>,
finished: &mut Vec<FinishedPublicationPoint>,
ready: ReadyCaInstance,
ready_queue_len_after_pop: usize,
compact_audit: bool,
) -> ReadyStageMetrics {
let publication_point_started = Instant::now();
let ready_queue_wait_ms = publication_point_started
.saturating_duration_since(ready.ready_enqueued_at)
.as_millis() as u64;
let mut metrics = ReadyStageMetrics {
ready_count: 1,
manifest_rsync_uri: Some(ready.node.handle.manifest_rsync_uri.clone()),
publication_point_rsync_uri: Some(ready.node.handle.publication_point_rsync_uri.clone()),
ready_queue_wait_ms,
ready_queue_len_after_pop,
..ReadyStageMetrics::default()
};
let mut warnings = ready.repo_outcome.warnings.clone();
let repo_outcome = ready.repo_outcome.clone();
if let Some(result) = runner.observe_or_reuse_publication_point_cache(
&ready.node.handle,
repo_outcome.repo_sync_source.as_deref(),
repo_outcome.repo_sync_phase.as_deref(),
repo_outcome.repo_sync_duration_ms,
repo_outcome.repo_sync_err.as_deref(),
&warnings,
) {
metrics.complete_count = 1;
metrics.discovered_children = result.discovered_children.len();
let child_enqueue_started = Instant::now();
enqueue_discovered_children(
runner,
next_id,
ca_queue,
&ready.node,
result.discovered_children.clone(),
);
metrics.child_enqueue_ms = elapsed_ms(child_enqueue_started);
finished.push(FinishedPublicationPoint {
node: FinishedPublicationPointNode::from_queued(ready.node),
result: compact_phase2_finished_result(result, compact_audit),
});
metrics.total_ms = elapsed_ms(publication_point_started);
emit_ready_publication_point_control_slow(
metrics.manifest_rsync_uri.as_deref().unwrap_or_default(),
metrics
.publication_point_rsync_uri
.as_deref()
.unwrap_or_default(),
&repo_outcome,
&metrics,
"publication_point_cache",
false,
);
return metrics;
}
let stage_fresh_started = Instant::now();
let stage = runner.stage_fresh_publication_point_after_repo_ready(
&ready.node.handle,
repo_outcome.repo_sync_ok,
repo_outcome.repo_sync_err.as_deref(),
);
metrics.stage_fresh_ms = elapsed_ms(stage_fresh_started);
let fresh_stage = match stage {
Ok(stage) => stage,
Err(err) => {
if metrics.stage_fresh_ms >= crate::progress_log::stage_fresh_slow_threshold_ms() {
crate::progress_log::emit(
"phase2_stage_fresh_slow",
serde_json::json!({
"manifest_rsync_uri": ready.node.handle.manifest_rsync_uri.as_str(),
"publication_point_rsync_uri": ready.node.handle.publication_point_rsync_uri.as_str(),
"status": "error",
"error": err.error.to_string(),
"stage_fresh_ms": metrics.stage_fresh_ms,
"snapshot_prepare_ms": err.snapshot_prepare_ms,
"repo_sync_source": repo_outcome.repo_sync_source.as_deref(),
"repo_sync_phase": repo_outcome.repo_sync_phase.as_deref(),
"repo_sync_duration_ms": repo_outcome.repo_sync_duration_ms,
}),
);
}
metrics.fallback_count = 1;
let fallback_started = Instant::now();
let fallback = runner.run_publication_point(&ready.node.handle);
metrics.fallback_full_run_ms = elapsed_ms(fallback_started);
if let Ok(result) = fallback.as_ref() {
metrics.discovered_children = result.discovered_children.len();
let child_enqueue_started = Instant::now();
enqueue_discovered_children(
runner,
next_id,
ca_queue,
&ready.node,
result.discovered_children.clone(),
);
metrics.child_enqueue_ms = elapsed_ms(child_enqueue_started);
}
finished.push(FinishedPublicationPoint {
node: FinishedPublicationPointNode::from_queued(ready.node),
result: compact_phase2_finished_result_result(fallback, compact_audit),
});
metrics.total_ms = elapsed_ms(publication_point_started);
emit_ready_publication_point_control_slow(
metrics.manifest_rsync_uri.as_deref().unwrap_or_default(),
metrics
.publication_point_rsync_uri
.as_deref()
.unwrap_or_default(),
&repo_outcome,
&metrics,
"fallback",
true,
);
return metrics;
}
};
metrics.snapshot_prepare_ms = fresh_stage.snapshot_prepare_ms;
metrics.snapshot_current_index_lock_ms =
fresh_stage.snapshot_prepare_timing.current_index_lock_ms;
metrics.snapshot_manifest_load_ms = fresh_stage.snapshot_prepare_timing.manifest_load_ms;
metrics.snapshot_manifest_index_lookup_ms =
fresh_stage.snapshot_prepare_timing.manifest_index_lookup_ms;
metrics.snapshot_manifest_blob_load_ms =
fresh_stage.snapshot_prepare_timing.manifest_blob_load_ms;
metrics.snapshot_manifest_decode_ms = fresh_stage.snapshot_prepare_timing.manifest_decode_ms;
metrics.snapshot_replay_guard_ms = fresh_stage.snapshot_prepare_timing.replay_guard_ms;
metrics.replay_meta_hit_count = fresh_stage.snapshot_prepare_timing.replay_meta_hit as usize;
metrics.replay_meta_miss_count = fresh_stage.snapshot_prepare_timing.replay_meta_miss as usize;
metrics.snapshot_manifest_entries_ms = fresh_stage.snapshot_prepare_timing.manifest_entries_ms;
metrics.snapshot_pack_files_ms = fresh_stage.snapshot_prepare_timing.pack_files_ms;
metrics.snapshot_pack_files_index_lookup_ms = fresh_stage
.snapshot_prepare_timing
.pack_files_index_lookup_ms;
metrics.snapshot_pack_files_blob_load_ms =
fresh_stage.snapshot_prepare_timing.pack_files_blob_load_ms;
metrics.snapshot_ee_path_validate_ms = fresh_stage.snapshot_prepare_timing.ee_path_validate_ms;
metrics.snapshot_manifest_file_count = fresh_stage.snapshot_prepare_timing.manifest_file_count;
metrics.child_discovery_ms = fresh_stage.child_discovery_ms;
if metrics.stage_fresh_ms >= crate::progress_log::stage_fresh_slow_threshold_ms() {
crate::progress_log::emit(
"phase2_stage_fresh_slow",
serde_json::json!({
"manifest_rsync_uri": ready.node.handle.manifest_rsync_uri.as_str(),
"publication_point_rsync_uri": ready.node.handle.publication_point_rsync_uri.as_str(),
"status": "ok",
"stage_fresh_ms": metrics.stage_fresh_ms,
"snapshot_prepare_ms": fresh_stage.snapshot_prepare_ms,
"snapshot_current_index_lock_ms": fresh_stage.snapshot_prepare_timing.current_index_lock_ms,
"snapshot_manifest_load_ms": fresh_stage.snapshot_prepare_timing.manifest_load_ms,
"snapshot_manifest_index_lookup_ms": fresh_stage.snapshot_prepare_timing.manifest_index_lookup_ms,
"snapshot_manifest_blob_load_ms": fresh_stage.snapshot_prepare_timing.manifest_blob_load_ms,
"snapshot_manifest_decode_ms": fresh_stage.snapshot_prepare_timing.manifest_decode_ms,
"snapshot_replay_guard_ms": fresh_stage.snapshot_prepare_timing.replay_guard_ms,
"replay_meta_hit": fresh_stage.snapshot_prepare_timing.replay_meta_hit,
"replay_meta_miss": fresh_stage.snapshot_prepare_timing.replay_meta_miss,
"snapshot_manifest_entries_ms": fresh_stage.snapshot_prepare_timing.manifest_entries_ms,
"snapshot_pack_files_ms": fresh_stage.snapshot_prepare_timing.pack_files_ms,
"snapshot_pack_files_index_lookup_ms": fresh_stage.snapshot_prepare_timing.pack_files_index_lookup_ms,
"snapshot_pack_files_blob_load_ms": fresh_stage.snapshot_prepare_timing.pack_files_blob_load_ms,
"snapshot_ee_path_validate_ms": fresh_stage.snapshot_prepare_timing.ee_path_validate_ms,
"snapshot_manifest_file_count": fresh_stage.snapshot_prepare_timing.manifest_file_count,
"child_discovery_ms": fresh_stage.child_discovery_ms,
"child_count": fresh_stage.discovered_children.len(),
"repo_sync_source": repo_outcome.repo_sync_source.as_deref(),
"repo_sync_phase": repo_outcome.repo_sync_phase.as_deref(),
"repo_sync_duration_ms": repo_outcome.repo_sync_duration_ms,
}),
);
}
warnings.extend(fresh_stage.warnings.clone());
metrics.discovered_children = fresh_stage.discovered_children.len();
let child_enqueue_started = Instant::now();
enqueue_discovered_children(
runner,
next_id,
ca_queue,
&ready.node,
fresh_stage.discovered_children.clone(),
);
metrics.child_enqueue_ms = elapsed_ms(child_enqueue_started);
let prepare_started = Instant::now();
let roa_presence_scan_started = Instant::now();
let has_roa = fresh_stage
.fresh_point
.files()
.iter()
.any(|file| file.rsync_uri.ends_with(".roa"));
metrics.roa_presence_scan_ms = elapsed_ms(roa_presence_scan_started);
if runner.enable_roa_validation_cache {
if let Some(timing) = runner.timing.as_ref() {
if has_roa {
timing.record_count("roa_validation_cache_roa_candidate_publication_points", 1);
} else {
timing.record_count("roa_validation_cache_skipped_no_roa_publication_points", 1);
}
}
}
let roa_cache_view = if has_roa {
let roa_cache_view_started = Instant::now();
let view = runner
.roa_validation_cache_view_for_fresh_point(&fresh_stage.fresh_point.manifest_rsync_uri);
metrics.roa_cache_view_ms = elapsed_ms(roa_cache_view_started);
view
} else {
None
};
let roa_cache = if runner.enable_roa_validation_cache && has_roa {
RoaValidationCacheInput::enabled(roa_cache_view.as_ref())
} else {
RoaValidationCacheInput::disabled()
};
match prepare_publication_point_for_parallel_roa_with_cache(
ready.node.id,
&fresh_stage.fresh_point,
runner.policy,
&ready.node.handle.ca_certificate_der,
ready.node.handle.ca_certificate_rsync_uri.as_deref(),
ready.node.handle.effective_ip_resources.as_ref(),
ready.node.handle.effective_as_resources.as_ref(),
runner.validation_time,
runner.persist_vcir,
roa_cache,
) {
ParallelObjectsPrepare::Complete(mut objects) => {
metrics.prepare_ms = elapsed_ms(prepare_started);
metrics.complete_count = 1;
metrics.roa_tasks = objects.stats.roa_total;
metrics.aspa_objects = objects.stats.aspa_total;
objects
.router_keys
.extend(fresh_stage.discovered_router_keys.clone());
objects.local_outputs_cache.extend(
crate::validation::tree_runner::build_router_key_local_outputs(
&ready.node.handle,
&objects.router_keys,
),
);
let direct_finalize_started = Instant::now();
let finalize_metrics = finalize_ready_objects(
runner,
ready.node,
fresh_stage,
warnings,
objects,
repo_outcome.clone(),
finished,
compact_audit,
);
metrics.direct_finalize_ms = finalize_metrics
.finalize_ms
.max(elapsed_ms(direct_finalize_started));
}
ParallelObjectsPrepare::Staged(objects_stage) => {
metrics.prepare_ms = elapsed_ms(prepare_started);
metrics.staged_count = 1;
metrics.locked_files = objects_stage.locked_file_count();
metrics.aspa_objects = objects_stage.aspa_task_count();
let build_tasks_started = Instant::now();
objects_stage.append_roa_tasks_to(pending_roa_dispatch);
metrics.build_roa_tasks_ms = elapsed_ms(build_tasks_started);
let task_count = objects_stage.roa_task_count();
metrics.roa_tasks = task_count;
if task_count == 0 {
metrics.zero_task_count = 1;
pending_finalization.push_back(FinalizeTask {
state: InflightPublicationPoint {
node: ready.node,
fresh_stage,
objects_stage,
repo_outcome: repo_outcome.clone(),
warnings,
started_at: publication_point_started,
objects_started_at: Instant::now(),
task_count,
tasks_submitted: 0,
first_task_submitted_at: None,
last_task_submitted_at: None,
first_result_at: None,
last_result_at: None,
worker_ms_total: 0,
worker_ms_max: 0,
queue_wait_ms_total: 0,
queue_wait_ms_max: 0,
finalize_enqueued_at: Some(Instant::now()),
results: Vec::new(),
},
});
} else {
inflight_publication_points.insert(
ready.node.id,
InflightPublicationPoint {
node: ready.node,
fresh_stage,
objects_stage,
repo_outcome: repo_outcome.clone(),
warnings,
started_at: publication_point_started,
objects_started_at: Instant::now(),
task_count,
tasks_submitted: 0,
first_task_submitted_at: None,
last_task_submitted_at: None,
first_result_at: None,
last_result_at: None,
worker_ms_total: 0,
worker_ms_max: 0,
queue_wait_ms_total: 0,
queue_wait_ms_max: 0,
finalize_enqueued_at: None,
results: Vec::with_capacity(task_count),
},
);
}
}
}
metrics.total_ms = elapsed_ms(publication_point_started);
emit_ready_publication_point_control_slow(
metrics.manifest_rsync_uri.as_deref().unwrap_or_default(),
metrics
.publication_point_rsync_uri
.as_deref()
.unwrap_or_default(),
&repo_outcome,
&metrics,
if metrics.complete_count > 0 {
"complete"
} else if metrics.zero_task_count > 0 {
"zero_task"
} else {
"staged"
},
false,
);
metrics
}
fn emit_ready_publication_point_control_slow(
manifest_rsync_uri: &str,
publication_point_rsync_uri: &str,
repo_outcome: &RepoSyncRuntimeOutcome,
metrics: &ReadyStageMetrics,
status: &str,
force_error_path: bool,
) {
let threshold_ms = crate::progress_log::pp_control_slow_threshold_ms();
if !force_error_path && metrics.total_ms < threshold_ms {
return;
}
crate::progress_log::emit(
"phase2_ready_publication_point_control_slow",
serde_json::json!({
"manifest_rsync_uri": manifest_rsync_uri,
"publication_point_rsync_uri": publication_point_rsync_uri,
"status": status,
"repo_sync_source": repo_outcome.repo_sync_source.as_deref(),
"repo_sync_phase": repo_outcome.repo_sync_phase.as_deref(),
"repo_sync_duration_ms": repo_outcome.repo_sync_duration_ms,
"repo_sync_ok": repo_outcome.repo_sync_ok,
"repo_sync_err": repo_outcome.repo_sync_err.as_deref(),
"ready_queue_wait_ms": metrics.ready_queue_wait_ms,
"ready_queue_len_after_pop": metrics.ready_queue_len_after_pop,
"stage_fresh_ms": metrics.stage_fresh_ms,
"child_discovery_ms": metrics.child_discovery_ms,
"child_enqueue_ms": metrics.child_enqueue_ms,
"discovered_children": metrics.discovered_children,
"roa_presence_scan_ms": metrics.roa_presence_scan_ms,
"roa_cache_view_ms": metrics.roa_cache_view_ms,
"prepare_ms": metrics.prepare_ms,
"build_roa_tasks_ms": metrics.build_roa_tasks_ms,
"direct_finalize_ms": metrics.direct_finalize_ms,
"fallback_full_run_ms": metrics.fallback_full_run_ms,
"locked_files": metrics.locked_files,
"roa_tasks": metrics.roa_tasks,
"aspa_objects": metrics.aspa_objects,
"complete_count": metrics.complete_count,
"staged_count": metrics.staged_count,
"zero_task_count": metrics.zero_task_count,
"fallback_count": metrics.fallback_count,
"total_ms": metrics.total_ms,
"slow_threshold_ms": threshold_ms,
}),
);
crate::progress_log::emit(
"phase2_ready_publication_point_control_snapshot_breakdown",
serde_json::json!({
"manifest_rsync_uri": manifest_rsync_uri,
"publication_point_rsync_uri": publication_point_rsync_uri,
"status": status,
"snapshot_prepare_ms": metrics.snapshot_prepare_ms,
"snapshot_current_index_lock_ms": metrics.snapshot_current_index_lock_ms,
"snapshot_manifest_load_ms": metrics.snapshot_manifest_load_ms,
"snapshot_manifest_index_lookup_ms": metrics.snapshot_manifest_index_lookup_ms,
"snapshot_manifest_blob_load_ms": metrics.snapshot_manifest_blob_load_ms,
"snapshot_manifest_decode_ms": metrics.snapshot_manifest_decode_ms,
"snapshot_replay_guard_ms": metrics.snapshot_replay_guard_ms,
"replay_meta_hit_count": metrics.replay_meta_hit_count,
"replay_meta_miss_count": metrics.replay_meta_miss_count,
"snapshot_manifest_entries_ms": metrics.snapshot_manifest_entries_ms,
"snapshot_pack_files_ms": metrics.snapshot_pack_files_ms,
"snapshot_pack_files_index_lookup_ms": metrics.snapshot_pack_files_index_lookup_ms,
"snapshot_pack_files_blob_load_ms": metrics.snapshot_pack_files_blob_load_ms,
"snapshot_ee_path_validate_ms": metrics.snapshot_ee_path_validate_ms,
"snapshot_manifest_file_count": metrics.snapshot_manifest_file_count,
"total_ms": metrics.total_ms,
"slow_threshold_ms": threshold_ms,
}),
);
}
fn enqueue_discovered_children(
runner: &Rpkiv1PublicationPointRunner<'_>,
next_id: &mut u64,
ca_queue: &mut VecDeque<QueuedCaInstance>,
parent: &QueuedCaInstance,
mut children: Vec<DiscoveredChildCaInstance>,
) {
children.sort_by(|a, b| {
a.handle
.manifest_rsync_uri
.cmp(&b.handle.manifest_rsync_uri)
.then_with(|| {
a.discovered_from
.child_ca_certificate_rsync_uri
.cmp(&b.discovered_from.child_ca_certificate_rsync_uri)
})
});
if let Some(runtime) = runner.repo_sync_runtime.as_ref() {
let _ = runtime.prefetch_discovered_children(&children);
}
for child in children {
let mut handle = child.handle.with_depth(parent.handle.depth + 1);
handle.parent_manifest_rsync_uri = Some(parent.handle.manifest_rsync_uri.clone());
ca_queue.push_back(QueuedCaInstance {
id: *next_id,
handle,
parent_id: Some(parent.id),
discovered_from: Some(child.discovered_from),
});
*next_id += 1;
}
}
fn finalize_ready_objects(
runner: &Rpkiv1PublicationPointRunner<'_>,
node: QueuedCaInstance,
fresh_stage: FreshPublicationPointStage,
warnings: Vec<Warning>,
objects: ObjectsOutput,
repo_outcome: RepoSyncRuntimeOutcome,
finished: &mut Vec<FinishedPublicationPoint>,
compact_audit: bool,
) -> FinalizePublicationPointMetrics {
let locked_files = fresh_stage.fresh_point.files().len();
let finalize_started = Instant::now();
let finalized = runner.finalize_fresh_publication_point_from_reducer(
&node.handle,
&fresh_stage.fresh_point,
warnings,
objects,
fresh_stage.child_audits,
fresh_stage.discovered_children,
repo_outcome.repo_sync_source.as_deref(),
repo_outcome.repo_sync_phase.as_deref(),
repo_outcome.repo_sync_duration_ms,
repo_outcome.repo_sync_err.as_deref(),
);
let finalize_ms = elapsed_ms(finalize_started);
let (result, metrics) = match finalized {
Ok(output) => {
let metrics = finalize_metrics_from_output(
&output,
0,
finalize_ms,
None,
finalize_ms,
locked_files,
);
emit_finalize_breakdown(
"phase2_direct_finalize_breakdown",
&node.handle.manifest_rsync_uri,
&node.handle.publication_point_rsync_uri,
&metrics,
);
(
compact_phase2_finished_result(output.result, compact_audit),
metrics,
)
}
Err(err) => {
let metrics = FinalizePublicationPointMetrics {
finalize_ms,
finalize_worker_ms: finalize_ms,
locked_files,
..FinalizePublicationPointMetrics::default()
};
emit_finalize_breakdown(
"phase2_direct_finalize_breakdown",
&node.handle.manifest_rsync_uri,
&node.handle.publication_point_rsync_uri,
&metrics,
);
(FinishedPublicationPointResult::Err(err), metrics)
}
};
finished.push(FinishedPublicationPoint {
node: FinishedPublicationPointNode::from_queued(node),
result,
});
metrics
}
fn finalize_metrics_from_output(
output: &FreshPublicationPointFinalizeOutput,
reduce_ms: u64,
finalize_ms: u64,
finalize_queue_wait_ms: Option<u64>,
finalize_worker_ms: u64,
locked_files: usize,
) -> FinalizePublicationPointMetrics {
FinalizePublicationPointMetrics {
reduce_ms,
finalize_ms,
finalize_queue_wait_ms,
finalize_worker_ms,
snapshot_pack_ms: output.snapshot_pack_ms,
persist_vcir_ms: output.persist_vcir_ms,
persist_build_vcir_ms: output.persist_vcir_timing.build_vcir_ms,
persist_replace_vcir_ms: output.persist_vcir_timing.replace_vcir_ms,
persist_replace_breakdown: output.persist_vcir_timing.replace_vcir.clone(),
ccr_projection_build_ms: output.ccr_projection_build_ms,
ccr_append_ms: output.ccr_append_ms,
audit_build_ms: output.audit_build_ms,
locked_files,
child_count: output.result.discovered_children.len(),
warning_count: output.result.warnings.len(),
vrp_count: output.result.objects.vrps.len(),
vap_count: output.result.objects.aspas.len(),
router_key_count: output.result.objects.router_keys.len(),
audit_object_count: output.result.audit.objects.len(),
}
}
fn emit_finalize_breakdown(
event_name: &str,
manifest_rsync_uri: &str,
publication_point_rsync_uri: &str,
metrics: &FinalizePublicationPointMetrics,
) {
crate::progress_log::emit(
event_name,
serde_json::json!({
"manifest_rsync_uri": manifest_rsync_uri,
"publication_point_rsync_uri": publication_point_rsync_uri,
"reduce_ms": metrics.reduce_ms,
"finalize_ms": metrics.finalize_ms,
"finalize_queue_wait_ms": metrics.finalize_queue_wait_ms,
"finalize_worker_ms": metrics.finalize_worker_ms,
"snapshot_pack_ms": metrics.snapshot_pack_ms,
"persist_vcir_ms": metrics.persist_vcir_ms,
"persist_build_vcir_ms": metrics.persist_build_vcir_ms,
"persist_replace_vcir_ms": metrics.persist_replace_vcir_ms,
"persist_replace_breakdown": &metrics.persist_replace_breakdown,
"ccr_projection_build_ms": metrics.ccr_projection_build_ms,
"ccr_append_ms": metrics.ccr_append_ms,
"audit_build_ms": metrics.audit_build_ms,
"locked_files": metrics.locked_files,
"child_count": metrics.child_count,
"warning_count": metrics.warning_count,
"vrp_count": metrics.vrp_count,
"vap_count": metrics.vap_count,
"router_key_count": metrics.router_key_count,
"audit_object_count": metrics.audit_object_count,
}),
);
}
fn flush_pending_roa_dispatch(
runner: &Rpkiv1PublicationPointRunner<'_>,
pending_roa_dispatch: &mut VecDeque<OwnedRoaTask>,
inflight_publication_points: &mut HashMap<u64, InflightPublicationPoint>,
) -> Result<RoaDispatchMetrics, TreeRunError> {
let started = Instant::now();
let mut metrics = RoaDispatchMetrics::default();
let Some(pool) = runner.parallel_roa_worker_pool.as_ref() else {
return Ok(metrics);
};
while let Some(mut task) = pending_roa_dispatch.pop_front() {
metrics.attempted += 1;
let pp_id = task.publication_point_id;
task.submitted_at = Some(Instant::now());
match pool.try_submit_round_robin(task) {
Ok(_) => {
metrics.submitted += 1;
if let Some(state) = inflight_publication_points.get_mut(&pp_id) {
let now = Instant::now();
state.tasks_submitted += 1;
if state.first_task_submitted_at.is_none() {
state.first_task_submitted_at = Some(now);
}
state.last_task_submitted_at = Some(now);
}
}
Err(ObjectWorkerSubmitError::QueueFull { task, .. }) => {
pending_roa_dispatch.push_front(task);
metrics.queue_full = true;
break;
}
Err(ObjectWorkerSubmitError::Disconnected { .. }) => {
return Err(TreeRunError::Runner(
"parallel ROA worker queue disconnected".to_string(),
));
}
}
}
metrics.pending_remaining = pending_roa_dispatch.len();
metrics.duration_ms = elapsed_ms(started);
Ok(metrics)
}
fn flush_pending_roa_dispatch_with_progress(
runner: &Rpkiv1PublicationPointRunner<'_>,
pending_roa_dispatch: &mut VecDeque<OwnedRoaTask>,
inflight_publication_points: &mut HashMap<u64, InflightPublicationPoint>,
pending_finalization: &VecDeque<FinalizeTask>,
) -> Result<(), TreeRunError> {
let dispatch_metrics =
flush_pending_roa_dispatch(runner, pending_roa_dispatch, inflight_publication_points)?;
if dispatch_metrics.attempted > 0 || dispatch_metrics.queue_full {
crate::progress_log::emit(
"phase2_roa_dispatch_batch",
serde_json::json!({
"attempted": dispatch_metrics.attempted,
"submitted": dispatch_metrics.submitted,
"queue_full": dispatch_metrics.queue_full,
"pending_remaining": dispatch_metrics.pending_remaining,
"duration_ms": dispatch_metrics.duration_ms,
"inflight_publication_points": inflight_publication_points.len(),
"pending_finalization_len": pending_finalization.len(),
}),
);
}
Ok(())
}
fn drain_object_results(
runner: &Rpkiv1PublicationPointRunner<'_>,
inflight_publication_points: &mut HashMap<u64, InflightPublicationPoint>,
pending_finalization: &mut VecDeque<FinalizeTask>,
result_budget: usize,
) -> Result<ObjectDrainMetrics, TreeRunError> {
let started = Instant::now();
let mut metrics = ObjectDrainMetrics::default();
let Some(pool) = runner.parallel_roa_worker_pool.as_ref() else {
return Ok(metrics);
};
let result_budget = result_budget.max(1);
while metrics.results_drained < result_budget {
let Some(result) = pool
.recv_result_timeout(Duration::from_millis(0))
.map_err(TreeRunError::Runner)?
else {
break;
};
metrics.results_drained += 1;
let pp_id = result.publication_point_id;
let _worker_index = result.worker_index;
metrics.worker_ms_total += result.worker_ms;
metrics.worker_ms_max = metrics.worker_ms_max.max(result.worker_ms);
metrics.queue_wait_ms_total += result.queue_wait_ms;
metrics.queue_wait_ms_max = metrics.queue_wait_ms_max.max(result.queue_wait_ms);
let should_finalize = if let Some(state) = inflight_publication_points.get_mut(&pp_id) {
let now = Instant::now();
if state.first_result_at.is_none() {
state.first_result_at = Some(now);
}
state.last_result_at = Some(now);
state.worker_ms_total += result.worker_ms;
state.worker_ms_max = state.worker_ms_max.max(result.worker_ms);
state.queue_wait_ms_total += result.queue_wait_ms;
state.queue_wait_ms_max = state.queue_wait_ms_max.max(result.queue_wait_ms);
state.results.push(result);
state.results.len() == state.task_count
} else {
false
};
if should_finalize {
let mut state = inflight_publication_points
.remove(&pp_id)
.expect("inflight publication point must exist");
state.finalize_enqueued_at = Some(Instant::now());
metrics.publication_points_completed += 1;
pending_finalization.push_back(FinalizeTask { state });
}
}
metrics.result_budget_exhausted = metrics.results_drained == result_budget;
metrics.duration_ms = elapsed_ms(started);
Ok(metrics)
}
fn drain_object_results_with_progress(
runner: &Rpkiv1PublicationPointRunner<'_>,
inflight_publication_points: &mut HashMap<u64, InflightPublicationPoint>,
pending_finalization: &mut VecDeque<FinalizeTask>,
pending_roa_dispatch_len: usize,
result_budget: usize,
) -> Result<(), TreeRunError> {
let drain_metrics = drain_object_results(
runner,
inflight_publication_points,
pending_finalization,
result_budget,
)?;
if drain_metrics.results_drained > 0 || drain_metrics.result_budget_exhausted {
crate::progress_log::emit(
"phase2_object_results_drain",
serde_json::json!({
"results_drained": drain_metrics.results_drained,
"publication_points_completed": drain_metrics.publication_points_completed,
"result_budget_exhausted": drain_metrics.result_budget_exhausted,
"result_drain_batch_size": result_budget,
"worker_ms_total": drain_metrics.worker_ms_total,
"worker_ms_max": drain_metrics.worker_ms_max,
"queue_wait_ms_total": drain_metrics.queue_wait_ms_total,
"queue_wait_ms_max": drain_metrics.queue_wait_ms_max,
"duration_ms": drain_metrics.duration_ms,
"pending_roa_dispatch_len": pending_roa_dispatch_len,
"inflight_publication_points": inflight_publication_points.len(),
"pending_finalization_len": pending_finalization.len(),
}),
);
}
Ok(())
}
fn submit_pending_finalization(
finalize_task_tx: &SyncSender<FinalizeTask>,
pending_finalization: &mut VecDeque<FinalizeTask>,
finalize_inflight: &mut usize,
) -> Result<FinalizeSubmitMetrics, TreeRunError> {
let started = Instant::now();
let mut metrics = FinalizeSubmitMetrics::default();
while let Some(task) = pending_finalization.pop_front() {
match finalize_task_tx.try_send(task) {
Ok(()) => {
metrics.submitted += 1;
*finalize_inflight += 1;
}
Err(TrySendError::Full(task)) => {
pending_finalization.push_front(task);
metrics.queue_full = true;
break;
}
Err(TrySendError::Disconnected(_task)) => {
return Err(TreeRunError::Runner(
"phase2 finalize worker queue disconnected".to_string(),
));
}
}
}
metrics.duration_ms = elapsed_ms(started);
Ok(metrics)
}
fn submit_pending_finalization_with_progress(
finalize_task_tx: &SyncSender<FinalizeTask>,
pending_finalization: &mut VecDeque<FinalizeTask>,
finalize_inflight: &mut usize,
finalize_queue_capacity: usize,
pending_roa_dispatch_len: usize,
inflight_publication_points_len: usize,
) -> Result<(), TreeRunError> {
let submit_metrics =
submit_pending_finalization(finalize_task_tx, pending_finalization, finalize_inflight)?;
if submit_metrics.submitted > 0 || submit_metrics.queue_full {
crate::progress_log::emit(
"phase2_finalize_task_submit",
serde_json::json!({
"submitted": submit_metrics.submitted,
"queue_full": submit_metrics.queue_full,
"duration_ms": submit_metrics.duration_ms,
"finalize_queue_capacity": finalize_queue_capacity,
"pending_finalization_len": pending_finalization.len(),
"finalize_inflight": *finalize_inflight,
"pending_roa_dispatch_len": pending_roa_dispatch_len,
"inflight_publication_points": inflight_publication_points_len,
}),
);
}
Ok(())
}
fn drain_finalize_results(
finalize_result_rx: &Receiver<FinalizeWorkerResult>,
finished: &mut Vec<FinishedPublicationPoint>,
finalize_inflight: &mut usize,
) -> Result<FinalizeResultsDrainMetrics, TreeRunError> {
let started = Instant::now();
let mut metrics = FinalizeResultsDrainMetrics::default();
loop {
match finalize_result_rx.try_recv() {
Ok(result) => {
metrics.results_drained += 1;
metrics.reduce_ms_total += result.metrics.reduce_ms;
metrics.reduce_ms_max = metrics.reduce_ms_max.max(result.metrics.reduce_ms);
metrics.finalize_ms_total += result.metrics.finalize_ms;
metrics.finalize_ms_max = metrics.finalize_ms_max.max(result.metrics.finalize_ms);
metrics.finalize_queue_wait_ms_max = metrics
.finalize_queue_wait_ms_max
.max(result.metrics.finalize_queue_wait_ms.unwrap_or(0));
metrics.finalize_worker_ms_total += result.metrics.finalize_worker_ms;
metrics.finalize_worker_ms_max = metrics
.finalize_worker_ms_max
.max(result.metrics.finalize_worker_ms);
metrics.snapshot_pack_ms_total += result.metrics.snapshot_pack_ms;
metrics.snapshot_pack_ms_max = metrics
.snapshot_pack_ms_max
.max(result.metrics.snapshot_pack_ms);
metrics.persist_vcir_ms_total += result.metrics.persist_vcir_ms;
metrics.persist_vcir_ms_max = metrics
.persist_vcir_ms_max
.max(result.metrics.persist_vcir_ms);
metrics.persist_build_vcir_ms_total += result.metrics.persist_build_vcir_ms;
metrics.persist_build_vcir_ms_max = metrics
.persist_build_vcir_ms_max
.max(result.metrics.persist_build_vcir_ms);
metrics.persist_replace_vcir_ms_total += result.metrics.persist_replace_vcir_ms;
metrics.persist_replace_vcir_ms_max = metrics
.persist_replace_vcir_ms_max
.max(result.metrics.persist_replace_vcir_ms);
metrics.ccr_projection_build_ms_total += result.metrics.ccr_projection_build_ms;
metrics.ccr_projection_build_ms_max = metrics
.ccr_projection_build_ms_max
.max(result.metrics.ccr_projection_build_ms);
metrics.ccr_append_ms_total += result.metrics.ccr_append_ms;
metrics.ccr_append_ms_max =
metrics.ccr_append_ms_max.max(result.metrics.ccr_append_ms);
metrics.audit_build_ms_total += result.metrics.audit_build_ms;
metrics.audit_build_ms_max = metrics
.audit_build_ms_max
.max(result.metrics.audit_build_ms);
*finalize_inflight = finalize_inflight.saturating_sub(1);
finished.push(result.finished);
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
if *finalize_inflight == 0 {
break;
}
return Err(TreeRunError::Runner(
"phase2 finalize result channel disconnected".to_string(),
));
}
}
}
metrics.duration_ms = elapsed_ms(started);
Ok(metrics)
}
fn drain_finalize_results_with_progress(
finalize_result_rx: &Receiver<FinalizeWorkerResult>,
finished: &mut Vec<FinishedPublicationPoint>,
finalize_inflight: &mut usize,
pending_finalization_len: usize,
pending_roa_dispatch_len: usize,
inflight_publication_points_len: usize,
) -> Result<(), TreeRunError> {
let drain_metrics = drain_finalize_results(finalize_result_rx, finished, finalize_inflight)?;
if drain_metrics.results_drained >= 64
|| (drain_metrics.results_drained > 0
&& (*finalize_inflight == 0 || pending_finalization_len > 0))
{
crate::progress_log::emit(
"phase2_finalize_results_drain",
serde_json::json!({
"results_drained": drain_metrics.results_drained,
"reduce_ms_total": drain_metrics.reduce_ms_total,
"reduce_ms_max": drain_metrics.reduce_ms_max,
"finalize_ms_total": drain_metrics.finalize_ms_total,
"finalize_ms_max": drain_metrics.finalize_ms_max,
"finalize_queue_wait_ms_max": drain_metrics.finalize_queue_wait_ms_max,
"finalize_worker_ms_total": drain_metrics.finalize_worker_ms_total,
"finalize_worker_ms_max": drain_metrics.finalize_worker_ms_max,
"snapshot_pack_ms_total": drain_metrics.snapshot_pack_ms_total,
"snapshot_pack_ms_max": drain_metrics.snapshot_pack_ms_max,
"persist_vcir_ms_total": drain_metrics.persist_vcir_ms_total,
"persist_vcir_ms_max": drain_metrics.persist_vcir_ms_max,
"persist_build_vcir_ms_total": drain_metrics.persist_build_vcir_ms_total,
"persist_build_vcir_ms_max": drain_metrics.persist_build_vcir_ms_max,
"persist_replace_vcir_ms_total": drain_metrics.persist_replace_vcir_ms_total,
"persist_replace_vcir_ms_max": drain_metrics.persist_replace_vcir_ms_max,
"ccr_projection_build_ms_total": drain_metrics.ccr_projection_build_ms_total,
"ccr_projection_build_ms_max": drain_metrics.ccr_projection_build_ms_max,
"ccr_append_ms_total": drain_metrics.ccr_append_ms_total,
"ccr_append_ms_max": drain_metrics.ccr_append_ms_max,
"audit_build_ms_total": drain_metrics.audit_build_ms_total,
"audit_build_ms_max": drain_metrics.audit_build_ms_max,
"duration_ms": drain_metrics.duration_ms,
"pending_finalization_len": pending_finalization_len,
"finalize_inflight": *finalize_inflight,
"pending_roa_dispatch_len": pending_roa_dispatch_len,
"inflight_publication_points": inflight_publication_points_len,
}),
);
}
Ok(())
}
fn run_finalize_worker(
runner: &Rpkiv1PublicationPointRunner<'_>,
finalize_task_rx: Receiver<FinalizeTask>,
finalize_result_tx: mpsc::Sender<FinalizeWorkerResult>,
compact_audit: bool,
) -> Result<(), TreeRunError> {
while let Ok(task) = finalize_task_rx.recv() {
let result = finalize_publication_point_state(runner, task.state, compact_audit);
if finalize_result_tx.send(result).is_err() {
return Err(TreeRunError::Runner(
"phase2 finalize result receiver disconnected".to_string(),
));
}
}
Ok(())
}
fn finalize_publication_point_state(
runner: &Rpkiv1PublicationPointRunner<'_>,
state: InflightPublicationPoint,
compact_audit: bool,
) -> FinalizeWorkerResult {
let finalize_worker_started = Instant::now();
let InflightPublicationPoint {
node,
fresh_stage,
objects_stage,
repo_outcome,
warnings,
started_at,
objects_started_at,
task_count,
tasks_submitted,
first_task_submitted_at,
last_task_submitted_at,
first_result_at,
last_result_at,
worker_ms_total,
worker_ms_max,
queue_wait_ms_total,
queue_wait_ms_max,
finalize_enqueued_at,
results,
} = state;
let finalize_queue_wait_ms = finalize_enqueued_at.or(last_result_at).map(|ready_at| {
Instant::now()
.saturating_duration_since(ready_at)
.as_millis() as u64
});
let objects_processing_ms = objects_started_at.elapsed().as_millis() as u64;
let reduce_started = Instant::now();
let locked_files = objects_stage.locked_file_count();
let reduce_result = reduce_parallel_roa_stage(objects_stage, results, runner.timing.as_ref());
let reduce_ms = elapsed_ms(reduce_started);
let (result, mut metrics) = match reduce_result {
Ok(mut objects) => {
let finalize_started = Instant::now();
objects
.router_keys
.extend(fresh_stage.discovered_router_keys.clone());
objects.local_outputs_cache.extend(
crate::validation::tree_runner::build_router_key_local_outputs(
&node.handle,
&objects.router_keys,
),
);
let finalized = runner.finalize_fresh_publication_point_from_reducer(
&node.handle,
&fresh_stage.fresh_point,
warnings,
objects,
fresh_stage.child_audits,
fresh_stage.discovered_children,
repo_outcome.repo_sync_source.as_deref(),
repo_outcome.repo_sync_phase.as_deref(),
repo_outcome.repo_sync_duration_ms,
repo_outcome.repo_sync_err.as_deref(),
);
let finalize_ms = elapsed_ms(finalize_started);
match finalized {
Ok(output) => {
let metrics = finalize_metrics_from_output(
&output,
reduce_ms,
finalize_ms,
finalize_queue_wait_ms,
0,
locked_files,
);
(
compact_phase2_finished_result(output.result, compact_audit),
metrics,
)
}
Err(err) => (
FinishedPublicationPointResult::Err(err),
FinalizePublicationPointMetrics {
reduce_ms,
finalize_ms,
finalize_queue_wait_ms,
locked_files,
..FinalizePublicationPointMetrics::default()
},
),
}
}
Err(err) => (
FinishedPublicationPointResult::Err(err),
FinalizePublicationPointMetrics {
reduce_ms,
finalize_queue_wait_ms,
locked_files,
..FinalizePublicationPointMetrics::default()
},
),
};
let finalize_worker_ms = elapsed_ms(finalize_worker_started);
metrics.finalize_worker_ms = finalize_worker_ms;
emit_finalize_breakdown(
"phase2_finalize_worker_breakdown",
node.handle.manifest_rsync_uri.as_str(),
node.handle.publication_point_rsync_uri.as_str(),
&metrics,
);
crate::progress_log::emit(
"phase2_publication_point_reduced",
serde_json::json!({
"manifest_rsync_uri": node.handle.manifest_rsync_uri.as_str(),
"publication_point_rsync_uri": node.handle.publication_point_rsync_uri.as_str(),
"objects_processing_ms": objects_processing_ms,
"task_count": task_count,
"tasks_submitted": tasks_submitted,
"first_task_submitted_ms": first_task_submitted_at.map(|t| t.saturating_duration_since(objects_started_at).as_millis() as u64),
"last_task_submitted_ms": last_task_submitted_at.map(|t| t.saturating_duration_since(objects_started_at).as_millis() as u64),
"task_submit_span_ms": match (first_task_submitted_at, last_task_submitted_at) {
(Some(first), Some(last)) => Some(last.saturating_duration_since(first).as_millis() as u64),
_ => None,
},
"first_result_ms": first_result_at.map(|t| t.saturating_duration_since(objects_started_at).as_millis() as u64),
"last_result_ms": last_result_at.map(|t| t.saturating_duration_since(objects_started_at).as_millis() as u64),
"all_results_ready_ms": last_result_at.map(|t| t.saturating_duration_since(objects_started_at).as_millis() as u64),
"finalize_queue_wait_ms": finalize_queue_wait_ms,
"result_span_ms": match (first_result_at, last_result_at) {
(Some(first), Some(last)) => Some(last.saturating_duration_since(first).as_millis() as u64),
_ => None,
},
"worker_ms_total": worker_ms_total,
"worker_ms_max": worker_ms_max,
"worker_ms_avg": if task_count > 0 { worker_ms_total / task_count as u64 } else { 0 },
"queue_wait_ms_total": queue_wait_ms_total,
"queue_wait_ms_max": queue_wait_ms_max,
"queue_wait_ms_avg": if task_count > 0 { queue_wait_ms_total / task_count as u64 } else { 0 },
"reduce_ms": reduce_ms,
"finalize_ms": metrics.finalize_ms,
"finalize_worker_ms": finalize_worker_ms,
"snapshot_pack_ms": metrics.snapshot_pack_ms,
"persist_vcir_ms": metrics.persist_vcir_ms,
"persist_build_vcir_ms": metrics.persist_build_vcir_ms,
"persist_replace_vcir_ms": metrics.persist_replace_vcir_ms,
"ccr_projection_build_ms": metrics.ccr_projection_build_ms,
"ccr_append_ms": metrics.ccr_append_ms,
"audit_build_ms": metrics.audit_build_ms,
"locked_files": metrics.locked_files,
"child_count": metrics.child_count,
"warning_count": metrics.warning_count,
"vrp_count": metrics.vrp_count,
"vap_count": metrics.vap_count,
"router_key_count": metrics.router_key_count,
"audit_object_count": metrics.audit_object_count,
"total_duration_ms": started_at.elapsed().as_millis() as u64,
}),
);
FinalizeWorkerResult {
finished: FinishedPublicationPoint {
node: FinishedPublicationPointNode::from_queued(node),
result,
},
metrics,
}
}
fn drain_repo_events(
repo_runtime: &dyn crate::parallel::repo_runtime::RepoSyncRuntime,
ca_waiting_repo_by_identity: &mut HashMap<RepoIdentity, Vec<QueuedCaInstance>>,
ready_queue: &mut VecDeque<ReadyCaInstance>,
timeout: Duration,
) -> Result<RepoDrainMetrics, TreeRunError> {
let started = Instant::now();
let mut metrics = RepoDrainMetrics::default();
if let Some(event) = repo_runtime
.recv_repo_result_timeout(timeout)
.map_err(TreeRunError::Runner)?
{
metrics.event_count += 1;
metrics.completions += event.completions.len();
for completion in event.completions {
let mut outcome = completion.outcome;
if completion.identity != event.transport_identity {
// Shared RRDP/rsync transports release many publication points, but the transport
// wall time should only be counted once in per-PP stage timing aggregation.
outcome.repo_sync_duration_ms = 0;
}
if let Some(waiters) = ca_waiting_repo_by_identity.remove(&completion.identity) {
metrics.ready_enqueued += waiters.len();
for node in waiters {
ready_queue.push_back(ReadyCaInstance {
node,
repo_outcome: outcome.clone(),
ready_enqueued_at: Instant::now(),
});
}
}
}
}
metrics.duration_ms = elapsed_ms(started);
Ok(metrics)
}
fn event_poll_timeout(
ca_queue: &VecDeque<QueuedCaInstance>,
ready_queue: &VecDeque<ReadyCaInstance>,
pending_roa_dispatch: &VecDeque<OwnedRoaTask>,
inflight_publication_points: &HashMap<u64, InflightPublicationPoint>,
pending_finalization: &VecDeque<FinalizeTask>,
finalize_inflight: usize,
instances_started: usize,
config: &TreeRunConfig,
) -> Duration {
if !ready_queue.is_empty()
|| !pending_roa_dispatch.is_empty()
|| !inflight_publication_points.is_empty()
|| !pending_finalization.is_empty()
|| (!ca_queue.is_empty() && can_start_more(instances_started, config))
{
Duration::from_millis(0)
} else if finalize_inflight > 0 {
Duration::from_millis(10)
} else {
Duration::from_millis(50)
}
}
fn is_complete(
ca_queue: &VecDeque<QueuedCaInstance>,
ready_queue: &VecDeque<ReadyCaInstance>,
ca_waiting_repo_by_identity: &HashMap<RepoIdentity, Vec<QueuedCaInstance>>,
pending_roa_dispatch: &VecDeque<OwnedRoaTask>,
inflight_publication_points: &HashMap<u64, InflightPublicationPoint>,
pending_finalization: &VecDeque<FinalizeTask>,
finalize_inflight: usize,
instances_started: usize,
config: &TreeRunConfig,
) -> bool {
let ca_queue_done = ca_queue.is_empty() || !can_start_more(instances_started, config);
ca_queue_done
&& ready_queue.is_empty()
&& ca_waiting_repo_by_identity.is_empty()
&& pending_roa_dispatch.is_empty()
&& inflight_publication_points.is_empty()
&& pending_finalization.is_empty()
&& finalize_inflight == 0
}
fn build_tree_output(mut finished: Vec<FinishedPublicationPoint>) -> TreeRunAuditOutput {
finished.sort_by_key(|item| item.node.id);
let mut instances_processed = 0usize;
let mut instances_failed = 0usize;
let mut warnings = Vec::new();
let mut vrps = Vec::new();
let mut aspas = Vec::new();
let mut router_keys = Vec::new();
let mut publication_points = Vec::new();
let mut roa_cache_stats = crate::validation::objects::RoaValidationCacheStats::default();
let mut cir_input = CirInputAccumulator::default();
for item in finished {
match item.result {
FinishedPublicationPointResult::Ok {
source,
warnings: result_warnings,
objects,
audit,
cir_fresh_objects,
cir_cached_objects,
} => {
instances_processed += 1;
warnings.extend(result_warnings);
warnings.extend(objects.warnings);
roa_cache_stats.add_assign(&objects.roa_cache_stats);
vrps.extend(objects.vrps);
aspas.extend(objects.aspas);
router_keys.extend(objects.router_keys);
let mut audit: PublicationPointAudit = audit;
audit.node_id = Some(item.node.id);
audit.parent_node_id = item.node.parent_id;
audit.discovered_from = item.node.discovered_from;
crate::validation::tree::submit_publication_point_cir_input(
&mut cir_input,
source,
&audit,
&cir_fresh_objects,
&cir_cached_objects,
)
.expect("CIR input collection from validated audit must not fail");
publication_points.push(audit);
}
FinishedPublicationPointResult::Err(err) => {
instances_failed += 1;
warnings.push(
Warning::new(format!("publication point failed: {err}"))
.with_context(&item.node.manifest_rsync_uri),
);
}
}
}
TreeRunAuditOutput {
tree: TreeRunOutput {
instances_processed,
instances_failed,
warnings,
vrps,
aspas,
router_keys,
},
publication_points,
roa_cache_stats,
cir_input: cir_input.finalize(),
}
}
pub fn run_tree_parallel_phase2_audit(
root: CaInstanceHandle,
runner: &Rpkiv1PublicationPointRunner<'_>,
config: &TreeRunConfig,
) -> Result<TreeRunAuditOutput, TreeRunError> {
run_tree_parallel_phase2_audit_multi_root(vec![root], runner, config)
}
#[cfg(test)]
mod tests {
use super::{
FinishedPublicationPointResult, compact_phase2_finished_result,
compact_phase2_finished_result_result, finalize_metrics_from_output,
};
use crate::audit::{
AuditObjectKind, AuditObjectResult, ObjectAuditEntry, PublicationPointAudit,
};
use crate::storage::PackTime;
use crate::validation::manifest::PublicationPointSource;
use crate::validation::objects::{ObjectsOutput, ObjectsStats};
use crate::validation::publication_point::PublicationPointSnapshot;
use crate::validation::tree::PublicationPointRunResult;
use crate::validation::tree_runner::{
BuildVcirTimingBreakdown, FreshPublicationPointFinalizeOutput, PersistVcirTimingBreakdown,
};
fn sample_snapshot() -> PublicationPointSnapshot {
PublicationPointSnapshot {
format_version: PublicationPointSnapshot::FORMAT_VERSION_V1,
manifest_rsync_uri: "rsync://example.test/repo/example.mft".to_string(),
publication_point_rsync_uri: "rsync://example.test/repo/".to_string(),
manifest_number_be: vec![1],
this_update: PackTime {
rfc3339_utc: "2026-04-21T00:00:00Z".to_string(),
},
next_update: PackTime {
rfc3339_utc: "2026-04-22T00:00:00Z".to_string(),
},
verified_at: PackTime {
rfc3339_utc: "2026-04-21T00:00:01Z".to_string(),
},
manifest_bytes: vec![1, 2, 3],
files: Vec::new(),
}
}
fn sample_result() -> PublicationPointRunResult {
PublicationPointRunResult {
source: PublicationPointSource::Fresh,
snapshot: Some(sample_snapshot()),
warnings: Vec::new(),
objects: ObjectsOutput {
vrps: Vec::new(),
aspas: Vec::new(),
router_keys: Vec::new(),
local_outputs_cache: Vec::new(),
warnings: Vec::new(),
stats: ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats: crate::validation::objects::RoaValidationCacheStats::default(),
},
audit: PublicationPointAudit::default(),
cir_fresh_objects: Vec::new(),
cir_cached_objects: Vec::new(),
discovered_children: Vec::new(),
}
}
#[test]
fn compact_phase2_finished_result_drops_snapshot() {
let result = compact_phase2_finished_result(sample_result(), false);
match result {
FinishedPublicationPointResult::Ok { warnings, .. } => {
assert!(warnings.is_empty());
}
FinishedPublicationPointResult::Err(err) => panic!("unexpected error: {err}"),
}
}
#[test]
fn compact_phase2_finished_result_can_drop_audit_payload() {
let mut sample = sample_result();
sample.audit.objects.push(crate::audit::ObjectAuditEntry {
rsync_uri: "rsync://example.test/repo/a.roa".to_string(),
sha256_hex: "11".repeat(32),
kind: crate::audit::AuditObjectKind::Roa,
result: crate::audit::AuditObjectResult::Ok,
detail: None,
});
sample.audit.warnings.push(crate::audit::AuditWarning {
message: "warning".to_string(),
rfc_refs: Vec::new(),
context: None,
});
sample.objects.audit.push(crate::audit::ObjectAuditEntry {
rsync_uri: "rsync://example.test/repo/b.roa".to_string(),
sha256_hex: "22".repeat(32),
kind: crate::audit::AuditObjectKind::Roa,
result: crate::audit::AuditObjectResult::Ok,
detail: None,
});
let result = compact_phase2_finished_result(sample, true);
match result {
FinishedPublicationPointResult::Ok {
objects,
audit,
cir_fresh_objects,
..
} => {
assert!(audit.objects.is_empty());
assert!(audit.warnings.is_empty());
assert!(objects.audit.is_empty());
assert_eq!(cir_fresh_objects.len(), 1);
}
FinishedPublicationPointResult::Err(err) => panic!("unexpected error: {err}"),
}
}
#[test]
fn compact_phase2_finished_result_result_preserves_err() {
match compact_phase2_finished_result_result(Err("boom".to_string()), false) {
FinishedPublicationPointResult::Err(err) => assert_eq!(err, "boom"),
FinishedPublicationPointResult::Ok { .. } => panic!("error should be preserved"),
}
}
#[test]
fn finalize_metrics_from_output_captures_breakdown_and_counts() {
let mut result = sample_result();
result.objects.vrps.push(crate::validation::objects::Vrp {
asn: 64496,
prefix: crate::data_model::roa::IpPrefix {
afi: crate::data_model::roa::RoaAfi::Ipv4,
addr: [192, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
prefix_len: 24,
},
max_length: 24,
});
result
.objects
.aspas
.push(crate::validation::objects::AspaAttestation {
customer_as_id: 64497,
provider_as_ids: vec![64498],
});
result.audit.objects.push(ObjectAuditEntry {
rsync_uri: "rsync://example.test/repo/a.roa".to_string(),
sha256_hex: "11".repeat(32),
kind: AuditObjectKind::Roa,
result: AuditObjectResult::Ok,
detail: None,
});
result
.discovered_children
.push(crate::validation::tree::DiscoveredChildCaInstance {
handle: crate::validation::tree::CaInstanceHandle {
tal_id: "test".to_string(),
ca_certificate_der: vec![1],
ca_certificate_rsync_uri: Some(
"rsync://example.test/repo/child.cer".to_string(),
),
effective_ip_resources: None,
effective_as_resources: None,
manifest_rsync_uri: "rsync://example.test/repo/child.mft".to_string(),
publication_point_rsync_uri: "rsync://example.test/repo/".to_string(),
rsync_base_uri: "rsync://example.test/repo/".to_string(),
rrdp_notification_uri: None,
parent_manifest_rsync_uri: None,
depth: 1,
},
discovered_from: crate::audit::DiscoveredFrom {
parent_manifest_rsync_uri: "rsync://example.test/repo/example.mft".to_string(),
child_ca_certificate_rsync_uri: "rsync://example.test/repo/child.cer"
.to_string(),
child_ca_certificate_sha256_hex: "55".repeat(32),
},
});
let output = FreshPublicationPointFinalizeOutput {
result,
snapshot_pack_ms: 1,
persist_vcir_ms: 2,
persist_vcir_timing: PersistVcirTimingBreakdown {
build_vcir_ms: 3,
replace_vcir_ms: 4,
build_vcir: BuildVcirTimingBreakdown {
related_artifacts_ms: 5,
..BuildVcirTimingBreakdown::default()
},
..PersistVcirTimingBreakdown::default()
},
ccr_projection_build_ms: 6,
ccr_append_ms: 7,
audit_build_ms: 8,
};
let metrics = finalize_metrics_from_output(&output, 9, 10, Some(11), 12, 13);
assert_eq!(metrics.snapshot_pack_ms, 1);
assert_eq!(metrics.persist_vcir_ms, 2);
assert_eq!(metrics.persist_build_vcir_ms, 3);
assert_eq!(metrics.persist_replace_vcir_ms, 4);
assert_eq!(
metrics.persist_replace_breakdown,
crate::storage::VcirReplaceTimingBreakdown::default()
);
assert_eq!(metrics.ccr_projection_build_ms, 6);
assert_eq!(metrics.ccr_append_ms, 7);
assert_eq!(metrics.audit_build_ms, 8);
assert_eq!(metrics.reduce_ms, 9);
assert_eq!(metrics.finalize_ms, 10);
assert_eq!(metrics.finalize_queue_wait_ms, Some(11));
assert_eq!(metrics.finalize_worker_ms, 12);
assert_eq!(metrics.locked_files, 13);
assert_eq!(metrics.child_count, 1);
assert_eq!(metrics.vrp_count, 1);
assert_eq!(metrics.vap_count, 1);
assert_eq!(metrics.audit_object_count, 1);
}
}