20260504 优化phase2 finalize调度长尾

This commit is contained in:
yuyr 2026-05-06 12:05:02 +08:00
parent b3b44d50c6
commit f843eedda9
5 changed files with 946 additions and 307 deletions

View File

@ -156,6 +156,16 @@ Options:
Phase 2 per-worker object queue capacity (default: 256) Phase 2 per-worker object queue capacity (default: 256)
--parallel-phase2-ready-batch-size <n> --parallel-phase2-ready-batch-size <n>
Phase 2 ready publication points processed per scheduler turn (default: 256) Phase 2 ready publication points processed per scheduler turn (default: 256)
--parallel-phase2-ready-batch-wall-time-budget-ms <n>
Phase 2 ready staging wall-time budget per scheduler turn (default: 100)
--parallel-phase2-result-drain-batch-size <n>
Phase 2 object results drained per scheduler turn (default: 2048)
--parallel-phase2-finalize-batch-size <n>
Legacy Phase 2 scheduler finalize budget; dedicated finalize worker ignores it (default: 256)
--parallel-phase2-finalize-batch-wall-time-budget-ms <n>
Legacy Phase 2 scheduler finalize time budget; dedicated finalize worker ignores it (default: 100)
--parallel-phase2-finalize-queue-capacity <n>
Phase 2 dedicated finalize worker queue capacity (default: 32768)
--rsync-local-dir <path> Use LocalDirRsyncFetcher rooted at this directory (offline tests) --rsync-local-dir <path> Use LocalDirRsyncFetcher rooted at this directory (offline tests)
--disable-rrdp Disable RRDP and synchronize only via rsync --disable-rrdp Disable RRDP and synchronize only via rsync
@ -293,6 +303,55 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
.parse::<usize>() .parse::<usize>()
.map_err(|_| format!("invalid --parallel-phase2-ready-batch-size: {v}"))?; .map_err(|_| format!("invalid --parallel-phase2-ready-batch-size: {v}"))?;
} }
"--parallel-phase2-ready-batch-wall-time-budget-ms" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-phase2-ready-batch-wall-time-budget-ms requires a value")?;
parallel_phase2_cfg.ready_batch_wall_time_budget_ms =
v.parse::<u64>().map_err(|_| {
format!("invalid --parallel-phase2-ready-batch-wall-time-budget-ms: {v}")
})?;
}
"--parallel-phase2-result-drain-batch-size" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-phase2-result-drain-batch-size requires a value")?;
parallel_phase2_cfg.object_result_drain_batch_size =
v.parse::<usize>().map_err(|_| {
format!("invalid --parallel-phase2-result-drain-batch-size: {v}")
})?;
}
"--parallel-phase2-finalize-batch-size" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-phase2-finalize-batch-size requires a value")?;
parallel_phase2_cfg.publication_point_finalize_batch_size = v
.parse::<usize>()
.map_err(|_| format!("invalid --parallel-phase2-finalize-batch-size: {v}"))?;
}
"--parallel-phase2-finalize-batch-wall-time-budget-ms" => {
i += 1;
let v = argv.get(i).ok_or(
"--parallel-phase2-finalize-batch-wall-time-budget-ms requires a value",
)?;
parallel_phase2_cfg.publication_point_finalize_wall_time_budget_ms =
v.parse::<u64>().map_err(|_| {
format!("invalid --parallel-phase2-finalize-batch-wall-time-budget-ms: {v}")
})?;
}
"--parallel-phase2-finalize-queue-capacity" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-phase2-finalize-queue-capacity requires a value")?;
parallel_phase2_cfg.publication_point_finalize_queue_capacity =
v.parse::<usize>().map_err(|_| {
format!("invalid --parallel-phase2-finalize-queue-capacity: {v}")
})?;
}
"--db" => { "--db" => {
i += 1; i += 1;
let v = argv.get(i).ok_or("--db requires a value")?; let v = argv.get(i).ok_or("--db requires a value")?;
@ -513,6 +572,36 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
usage() usage()
)); ));
} }
if parallel_phase2_cfg.ready_batch_wall_time_budget_ms == 0 {
return Err(format!(
"--parallel-phase2-ready-batch-wall-time-budget-ms must be > 0\n\n{}",
usage()
));
}
if parallel_phase2_cfg.object_result_drain_batch_size == 0 {
return Err(format!(
"--parallel-phase2-result-drain-batch-size must be > 0\n\n{}",
usage()
));
}
if parallel_phase2_cfg.publication_point_finalize_batch_size == 0 {
return Err(format!(
"--parallel-phase2-finalize-batch-size must be > 0\n\n{}",
usage()
));
}
if parallel_phase2_cfg.publication_point_finalize_wall_time_budget_ms == 0 {
return Err(format!(
"--parallel-phase2-finalize-batch-wall-time-budget-ms must be > 0\n\n{}",
usage()
));
}
if parallel_phase2_cfg.publication_point_finalize_queue_capacity == 0 {
return Err(format!(
"--parallel-phase2-finalize-queue-capacity must be > 0\n\n{}",
usage()
));
}
if !tal_urls.is_empty() && !ta_paths.is_empty() { if !tal_urls.is_empty() && !ta_paths.is_empty() {
return Err(format!( return Err(format!(
"--ta-path cannot be used with --tal-url mode\n\n{}", "--ta-path cannot be used with --tal-url mode\n\n{}",
@ -2191,11 +2280,44 @@ mod tests {
"17".to_string(), "17".to_string(),
"--parallel-phase2-ready-batch-size".to_string(), "--parallel-phase2-ready-batch-size".to_string(),
"31".to_string(), "31".to_string(),
"--parallel-phase2-ready-batch-wall-time-budget-ms".to_string(),
"43".to_string(),
"--parallel-phase2-result-drain-batch-size".to_string(),
"37".to_string(),
"--parallel-phase2-finalize-batch-size".to_string(),
"41".to_string(),
"--parallel-phase2-finalize-batch-wall-time-budget-ms".to_string(),
"47".to_string(),
"--parallel-phase2-finalize-queue-capacity".to_string(),
"8192".to_string(),
]; ];
let args = parse_args(&argv).expect("parse args"); let args = parse_args(&argv).expect("parse args");
assert_eq!(args.parallel_phase2_config.object_workers, 3); assert_eq!(args.parallel_phase2_config.object_workers, 3);
assert_eq!(args.parallel_phase2_config.worker_queue_capacity, 17); assert_eq!(args.parallel_phase2_config.worker_queue_capacity, 17);
assert_eq!(args.parallel_phase2_config.ready_batch_size, 31); assert_eq!(args.parallel_phase2_config.ready_batch_size, 31);
assert_eq!(
args.parallel_phase2_config.ready_batch_wall_time_budget_ms,
43
);
assert_eq!(
args.parallel_phase2_config.object_result_drain_batch_size,
37
);
assert_eq!(
args.parallel_phase2_config
.publication_point_finalize_batch_size,
41
);
assert_eq!(
args.parallel_phase2_config
.publication_point_finalize_wall_time_budget_ms,
47
);
assert_eq!(
args.parallel_phase2_config
.publication_point_finalize_queue_capacity,
8192
);
assert_eq!(args.parallel_phase1_config, ParallelPhase1Config::default()); assert_eq!(args.parallel_phase1_config, ParallelPhase1Config::default());
} }
@ -2214,6 +2336,96 @@ mod tests {
assert!(err.contains("--parallel-phase2-ready-batch-size"), "{err}"); assert!(err.contains("--parallel-phase2-ready-batch-size"), "{err}");
} }
#[test]
fn parse_rejects_zero_phase2_result_drain_batch_size() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/root.tal".to_string(),
"--parallel-phase2-result-drain-batch-size".to_string(),
"0".to_string(),
];
let err = parse_args(&argv).expect_err("zero result drain batch must fail");
assert!(
err.contains("--parallel-phase2-result-drain-batch-size"),
"{err}"
);
}
#[test]
fn parse_rejects_zero_phase2_ready_batch_wall_time_budget_ms() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/root.tal".to_string(),
"--parallel-phase2-ready-batch-wall-time-budget-ms".to_string(),
"0".to_string(),
];
let err = parse_args(&argv).expect_err("zero ready time budget must fail");
assert!(
err.contains("--parallel-phase2-ready-batch-wall-time-budget-ms"),
"{err}"
);
}
#[test]
fn parse_rejects_zero_phase2_finalize_batch_size() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/root.tal".to_string(),
"--parallel-phase2-finalize-batch-size".to_string(),
"0".to_string(),
];
let err = parse_args(&argv).expect_err("zero finalize batch must fail");
assert!(
err.contains("--parallel-phase2-finalize-batch-size"),
"{err}"
);
}
#[test]
fn parse_rejects_zero_phase2_finalize_batch_wall_time_budget_ms() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/root.tal".to_string(),
"--parallel-phase2-finalize-batch-wall-time-budget-ms".to_string(),
"0".to_string(),
];
let err = parse_args(&argv).expect_err("zero finalize time budget must fail");
assert!(
err.contains("--parallel-phase2-finalize-batch-wall-time-budget-ms"),
"{err}"
);
}
#[test]
fn parse_rejects_zero_phase2_finalize_queue_capacity() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/root.tal".to_string(),
"--parallel-phase2-finalize-queue-capacity".to_string(),
"0".to_string(),
];
let err = parse_args(&argv).expect_err("zero finalize queue capacity must fail");
assert!(
err.contains("--parallel-phase2-finalize-queue-capacity"),
"{err}"
);
}
#[test] #[test]
fn parse_rejects_removed_parallel_enable_flags() { fn parse_rejects_removed_parallel_enable_flags() {
let argv = vec![ let argv = vec![

View File

@ -10,6 +10,11 @@ pub struct ParallelPhase2Config {
pub object_workers: usize, pub object_workers: usize,
pub worker_queue_capacity: usize, pub worker_queue_capacity: usize,
pub ready_batch_size: usize, pub ready_batch_size: usize,
pub ready_batch_wall_time_budget_ms: u64,
pub object_result_drain_batch_size: usize,
pub publication_point_finalize_batch_size: usize,
pub publication_point_finalize_wall_time_budget_ms: u64,
pub publication_point_finalize_queue_capacity: usize,
} }
impl Default for ParallelPhase2Config { impl Default for ParallelPhase2Config {
@ -18,6 +23,11 @@ impl Default for ParallelPhase2Config {
object_workers: 8, object_workers: 8,
worker_queue_capacity: 256, worker_queue_capacity: 256,
ready_batch_size: 256, ready_batch_size: 256,
ready_batch_wall_time_budget_ms: 100,
object_result_drain_batch_size: 2048,
publication_point_finalize_batch_size: 256,
publication_point_finalize_wall_time_budget_ms: 100,
publication_point_finalize_queue_capacity: 32768,
} }
} }
} }
@ -50,5 +60,10 @@ mod tests {
assert!(cfg.object_workers > 0); assert!(cfg.object_workers > 0);
assert!(cfg.worker_queue_capacity > 0); assert!(cfg.worker_queue_capacity > 0);
assert!(cfg.ready_batch_size > 0); assert!(cfg.ready_batch_size > 0);
assert!(cfg.ready_batch_wall_time_budget_ms > 0);
assert!(cfg.object_result_drain_batch_size > 0);
assert!(cfg.publication_point_finalize_batch_size > 0);
assert!(cfg.publication_point_finalize_wall_time_budget_ms > 0);
assert!(cfg.publication_point_finalize_queue_capacity > 0);
} }
} }

View File

@ -1,4 +1,5 @@
use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::mpsc::{self, Receiver, SyncSender, TryRecvError, TrySendError};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use crate::audit::{DiscoveredFrom, PublicationPointAudit}; use crate::audit::{DiscoveredFrom, PublicationPointAudit};
@ -58,6 +59,15 @@ struct FinishedPublicationPoint {
result: Result<PublicationPointRunResult, String>, result: Result<PublicationPointRunResult, String>,
} }
struct FinalizeTask {
state: InflightPublicationPoint,
}
struct FinalizeWorkerResult {
finished: FinishedPublicationPoint,
metrics: FinalizePublicationPointMetrics,
}
#[derive(Default)] #[derive(Default)]
struct ReadyStageMetrics { struct ReadyStageMetrics {
manifest_rsync_uri: Option<String>, manifest_rsync_uri: Option<String>,
@ -248,15 +258,40 @@ struct RoaDispatchMetrics {
#[derive(Default)] #[derive(Default)]
struct ObjectDrainMetrics { struct ObjectDrainMetrics {
results_drained: usize, results_drained: usize,
publication_points_finalized: usize, publication_points_completed: usize,
reduce_ms_total: u64,
reduce_ms_max: u64,
finalize_ms_total: u64,
finalize_ms_max: u64,
worker_ms_total: u64, worker_ms_total: u64,
worker_ms_max: u64, worker_ms_max: u64,
queue_wait_ms_total: u64, queue_wait_ms_total: u64,
queue_wait_ms_max: u64, queue_wait_ms_max: u64,
result_budget_exhausted: bool,
duration_ms: u64,
}
#[derive(Default)]
struct FinalizeSubmitMetrics {
submitted: usize,
queue_full: bool,
duration_ms: u64,
}
#[derive(Default)]
struct FinalizePublicationPointMetrics {
reduce_ms: u64,
finalize_ms: u64,
finalize_queue_wait_ms: Option<u64>,
finalize_worker_ms: u64,
}
#[derive(Default)]
struct FinalizeResultsDrainMetrics {
results_drained: usize,
reduce_ms_total: u64,
reduce_ms_max: u64,
finalize_ms_total: u64,
finalize_ms_max: u64,
finalize_queue_wait_ms_max: u64,
finalize_worker_ms_total: u64,
finalize_worker_ms_max: u64,
duration_ms: u64, duration_ms: u64,
} }
@ -329,57 +364,120 @@ pub fn run_tree_parallel_phase2_audit_multi_root(
HashMap::new(); HashMap::new();
let mut ready_queue: VecDeque<ReadyCaInstance> = VecDeque::new(); let mut ready_queue: VecDeque<ReadyCaInstance> = VecDeque::new();
let mut inflight_publication_points: HashMap<u64, InflightPublicationPoint> = HashMap::new(); let mut inflight_publication_points: HashMap<u64, InflightPublicationPoint> = HashMap::new();
let mut pending_finalization: VecDeque<FinalizeTask> = VecDeque::new();
let mut pending_roa_dispatch: VecDeque<OwnedRoaTask> = VecDeque::new(); let mut pending_roa_dispatch: VecDeque<OwnedRoaTask> = VecDeque::new();
let mut finished: Vec<FinishedPublicationPoint> = Vec::new(); let mut finished: Vec<FinishedPublicationPoint> = Vec::new();
let mut instances_started = 0usize; let mut instances_started = 0usize;
let ready_batch_size = runner let phase2_config = runner.parallel_phase2_config.as_ref();
.parallel_phase2_config let ready_batch_size = phase2_config
.as_ref()
.map(|cfg| cfg.ready_batch_size) .map(|cfg| cfg.ready_batch_size)
.unwrap_or(256) .unwrap_or(256)
.max(1); .max(1);
let ready_batch_wall_time_budget_ms = phase2_config
.map(|cfg| cfg.ready_batch_wall_time_budget_ms)
.unwrap_or(100)
.max(1);
let ready_batch_wall_time_budget = Duration::from_millis(ready_batch_wall_time_budget_ms);
let object_result_drain_batch_size = phase2_config
.map(|cfg| cfg.object_result_drain_batch_size)
.unwrap_or(2048)
.max(1);
let publication_point_finalize_queue_capacity = phase2_config
.map(|cfg| cfg.publication_point_finalize_queue_capacity)
.unwrap_or(32768)
.max(1);
let (finalize_task_tx, finalize_task_rx) =
mpsc::sync_channel::<FinalizeTask>(publication_point_finalize_queue_capacity);
let (finalize_result_tx, finalize_result_rx) = mpsc::channel::<FinalizeWorkerResult>();
let mut finalize_inflight = 0usize;
return std::thread::scope(|scope| {
let finalize_worker = scope.spawn(move || {
run_finalize_worker(
runner,
finalize_task_rx,
finalize_result_tx,
config.compact_audit,
)
});
let run_result: Result<(), TreeRunError> = (|| {
loop { loop {
while can_start_more(instances_started, config) { drain_finalize_results_with_progress(
let Some(node) = ca_queue.pop_front() else { &finalize_result_rx,
break; &mut finished,
}; &mut finalize_inflight,
if !visited_manifest_uris.insert(node.handle.manifest_rsync_uri.clone()) { pending_finalization.len(),
continue; pending_roa_dispatch.len(),
} inflight_publication_points.len(),
if let Some(max_depth) = config.max_depth { )?;
if node.handle.depth > max_depth { flush_pending_roa_dispatch_with_progress(
continue; runner,
} &mut pending_roa_dispatch,
} &mut inflight_publication_points,
instances_started += 1; &pending_finalization,
match repo_runtime.request_publication_point_repo(&node.handle, 0) { )?;
Ok(RepoSyncRequestStatus::Ready { mut outcome, .. }) => { drain_object_results_with_progress(
// Ready here means this CA is reusing repo work that has already completed runner,
// (often due to child prefetch). Do not add the transport duration again. &mut inflight_publication_points,
outcome.repo_sync_duration_ms = 0; &mut pending_finalization,
ready_queue.push_back(ReadyCaInstance { pending_roa_dispatch.len(),
node, object_result_drain_batch_size,
repo_outcome: outcome, )?;
}); submit_pending_finalization_with_progress(
} &finalize_task_tx,
Ok(RepoSyncRequestStatus::Pending { identity, .. }) => { &mut pending_finalization,
ca_waiting_repo_by_identity &mut finalize_inflight,
.entry(identity) publication_point_finalize_queue_capacity,
.or_default() pending_roa_dispatch.len(),
.push(node); inflight_publication_points.len(),
} )?;
Err(err) => {
finished.push(FinishedPublicationPoint { start_queued_ca_instances(
node, repo_runtime.as_ref(),
result: Err(err), &mut ca_queue,
}); &mut ready_queue,
} &mut ca_waiting_repo_by_identity,
} &mut finished,
&mut visited_manifest_uris,
&mut instances_started,
config,
);
let repo_poll_timeout = event_poll_timeout(
&ca_queue,
&ready_queue,
&pending_roa_dispatch,
&inflight_publication_points,
&pending_finalization,
finalize_inflight,
instances_started,
config,
);
let repo_metrics = drain_repo_events(
repo_runtime.as_ref(),
&mut ca_waiting_repo_by_identity,
&mut ready_queue,
repo_poll_timeout,
)?;
if repo_metrics.event_count > 0 {
crate::progress_log::emit(
"phase2_repo_events_drain",
serde_json::json!({
"event_count": repo_metrics.event_count,
"completions": repo_metrics.completions,
"ready_enqueued": repo_metrics.ready_enqueued,
"duration_ms": repo_metrics.duration_ms,
"ready_queue_len": ready_queue.len(),
"ca_waiting_repo_identities": ca_waiting_repo_by_identity.len(),
}),
);
} }
let ready_batch_started = Instant::now(); let ready_batch_started = Instant::now();
let mut ready_batch_metrics = ReadyStageBatchMetrics::default(); let mut ready_batch_metrics = ReadyStageBatchMetrics::default();
let mut ready_time_budget_exhausted = false;
while ready_batch_metrics.ready_count < ready_batch_size { while ready_batch_metrics.ready_count < ready_batch_size {
let Some(ready) = ready_queue.pop_front() else { let Some(ready) = ready_queue.pop_front() else {
break; break;
@ -395,9 +493,21 @@ pub fn run_tree_parallel_phase2_audit_multi_root(
config.compact_audit, config.compact_audit,
); );
ready_batch_metrics.record(metrics); ready_batch_metrics.record(metrics);
if ready_batch_metrics.ready_count > 0
&& ready_batch_started.elapsed() >= ready_batch_wall_time_budget
{
ready_time_budget_exhausted = !ready_queue.is_empty();
break;
}
} }
if ready_batch_metrics.ready_count > 0 { if ready_batch_metrics.ready_count > 0 {
ready_batch_metrics.total_ms = elapsed_ms(ready_batch_started); ready_batch_metrics.total_ms = elapsed_ms(ready_batch_started);
let ready_count_budget_exhausted = ready_batch_metrics.ready_count
>= ready_batch_size
&& !ready_queue.is_empty();
ready_time_budget_exhausted = ready_time_budget_exhausted
|| (!ready_queue.is_empty()
&& ready_batch_metrics.total_ms >= ready_batch_wall_time_budget_ms);
crate::progress_log::emit( crate::progress_log::emit(
"phase2_ready_queue_batch", "phase2_ready_queue_batch",
serde_json::json!({ serde_json::json!({
@ -423,11 +533,16 @@ pub fn run_tree_parallel_phase2_audit_multi_root(
"build_roa_tasks_ms_max": ready_batch_metrics.build_roa_tasks_ms_max, "build_roa_tasks_ms_max": ready_batch_metrics.build_roa_tasks_ms_max,
"batch_duration_ms": ready_batch_metrics.total_ms, "batch_duration_ms": ready_batch_metrics.total_ms,
"ready_batch_size": ready_batch_size, "ready_batch_size": ready_batch_size,
"ready_batch_wall_time_budget_ms": ready_batch_wall_time_budget_ms,
"ready_queue_len_after_batch": ready_queue.len(), "ready_queue_len_after_batch": ready_queue.len(),
"ready_queue_budget_exhausted": !ready_queue.is_empty(), "ready_queue_budget_exhausted": !ready_queue.is_empty(),
"ready_count_budget_exhausted": ready_count_budget_exhausted,
"ready_time_budget_exhausted": ready_time_budget_exhausted,
"ca_queue_len_after_batch": ca_queue.len(), "ca_queue_len_after_batch": ca_queue.len(),
"pending_roa_dispatch_len_after_batch": pending_roa_dispatch.len(), "pending_roa_dispatch_len_after_batch": pending_roa_dispatch.len(),
"inflight_publication_points_after_batch": inflight_publication_points.len(), "inflight_publication_points_after_batch": inflight_publication_points.len(),
"pending_finalization_len_after_batch": pending_finalization.len(),
"finalize_inflight_after_batch": finalize_inflight,
}), }),
); );
crate::progress_log::emit( crate::progress_log::emit(
@ -473,77 +588,35 @@ pub fn run_tree_parallel_phase2_audit_multi_root(
); );
} }
let dispatch_metrics = flush_pending_roa_dispatch( flush_pending_roa_dispatch_with_progress(
runner, runner,
&mut pending_roa_dispatch, &mut pending_roa_dispatch,
&mut inflight_publication_points, &mut inflight_publication_points,
&pending_finalization,
)?; )?;
if dispatch_metrics.attempted > 0 || dispatch_metrics.queue_full { drain_object_results_with_progress(
crate::progress_log::emit(
"phase2_roa_dispatch_batch",
serde_json::json!({
"attempted": dispatch_metrics.attempted,
"submitted": dispatch_metrics.submitted,
"queue_full": dispatch_metrics.queue_full,
"pending_remaining": dispatch_metrics.pending_remaining,
"duration_ms": dispatch_metrics.duration_ms,
"inflight_publication_points": inflight_publication_points.len(),
}),
);
}
let drain_metrics = drain_object_results(
runner, runner,
&mut inflight_publication_points, &mut inflight_publication_points,
&mut pending_finalization,
pending_roa_dispatch.len(),
object_result_drain_batch_size,
)?;
submit_pending_finalization_with_progress(
&finalize_task_tx,
&mut pending_finalization,
&mut finalize_inflight,
publication_point_finalize_queue_capacity,
pending_roa_dispatch.len(),
inflight_publication_points.len(),
)?;
drain_finalize_results_with_progress(
&finalize_result_rx,
&mut finished, &mut finished,
config.compact_audit, &mut finalize_inflight,
pending_finalization.len(),
pending_roa_dispatch.len(),
inflight_publication_points.len(),
)?; )?;
if drain_metrics.results_drained > 0 {
crate::progress_log::emit(
"phase2_object_results_drain",
serde_json::json!({
"results_drained": drain_metrics.results_drained,
"publication_points_finalized": drain_metrics.publication_points_finalized,
"reduce_ms_total": drain_metrics.reduce_ms_total,
"reduce_ms_max": drain_metrics.reduce_ms_max,
"finalize_ms_total": drain_metrics.finalize_ms_total,
"finalize_ms_max": drain_metrics.finalize_ms_max,
"worker_ms_total": drain_metrics.worker_ms_total,
"worker_ms_max": drain_metrics.worker_ms_max,
"queue_wait_ms_total": drain_metrics.queue_wait_ms_total,
"queue_wait_ms_max": drain_metrics.queue_wait_ms_max,
"duration_ms": drain_metrics.duration_ms,
"pending_roa_dispatch_len": pending_roa_dispatch.len(),
"inflight_publication_points": inflight_publication_points.len(),
}),
);
}
let repo_poll_timeout = event_poll_timeout(
&ca_queue,
&ready_queue,
&pending_roa_dispatch,
&inflight_publication_points,
instances_started,
config,
);
let repo_metrics = drain_repo_events(
repo_runtime.as_ref(),
&mut ca_waiting_repo_by_identity,
&mut ready_queue,
repo_poll_timeout,
)?;
if repo_metrics.event_count > 0 {
crate::progress_log::emit(
"phase2_repo_events_drain",
serde_json::json!({
"event_count": repo_metrics.event_count,
"completions": repo_metrics.completions,
"ready_enqueued": repo_metrics.ready_enqueued,
"duration_ms": repo_metrics.duration_ms,
"ready_queue_len": ready_queue.len(),
"ca_waiting_repo_identities": ca_waiting_repo_by_identity.len(),
}),
);
}
if is_complete( if is_complete(
&ca_queue, &ca_queue,
@ -551,6 +624,8 @@ pub fn run_tree_parallel_phase2_audit_multi_root(
&ca_waiting_repo_by_identity, &ca_waiting_repo_by_identity,
&pending_roa_dispatch, &pending_roa_dispatch,
&inflight_publication_points, &inflight_publication_points,
&pending_finalization,
finalize_inflight,
instances_started, instances_started,
config, config,
) { ) {
@ -561,7 +636,32 @@ pub fn run_tree_parallel_phase2_audit_multi_root(
repo_runtime repo_runtime
.reset_run_state() .reset_run_state()
.map_err(TreeRunError::Runner)?; .map_err(TreeRunError::Runner)?;
Ok(())
})();
drop(finalize_task_tx);
let worker_result = finalize_worker
.join()
.map_err(|_| TreeRunError::Runner("phase2 finalize worker panicked".to_string()))?;
run_result?;
worker_result?;
drain_finalize_results_with_progress(
&finalize_result_rx,
&mut finished,
&mut finalize_inflight,
pending_finalization.len(),
pending_roa_dispatch.len(),
inflight_publication_points.len(),
)?;
if finalize_inflight != 0 || !pending_finalization.is_empty() {
return Err(TreeRunError::Runner(format!(
"phase2 finalize worker stopped with pending work: queued={} inflight={}",
pending_finalization.len(),
finalize_inflight
)));
}
Ok(build_tree_output(finished)) Ok(build_tree_output(finished))
});
} }
fn can_start_more(instances_started: usize, config: &TreeRunConfig) -> bool { fn can_start_more(instances_started: usize, config: &TreeRunConfig) -> bool {
@ -571,6 +671,55 @@ fn can_start_more(instances_started: usize, config: &TreeRunConfig) -> bool {
.unwrap_or(true) .unwrap_or(true)
} }
fn start_queued_ca_instances(
repo_runtime: &dyn crate::parallel::repo_runtime::RepoSyncRuntime,
ca_queue: &mut VecDeque<QueuedCaInstance>,
ready_queue: &mut VecDeque<ReadyCaInstance>,
ca_waiting_repo_by_identity: &mut HashMap<RepoIdentity, Vec<QueuedCaInstance>>,
finished: &mut Vec<FinishedPublicationPoint>,
visited_manifest_uris: &mut HashSet<String>,
instances_started: &mut usize,
config: &TreeRunConfig,
) {
while can_start_more(*instances_started, config) {
let Some(node) = ca_queue.pop_front() else {
break;
};
if !visited_manifest_uris.insert(node.handle.manifest_rsync_uri.clone()) {
continue;
}
if let Some(max_depth) = config.max_depth {
if node.handle.depth > max_depth {
continue;
}
}
*instances_started += 1;
match repo_runtime.request_publication_point_repo(&node.handle, 0) {
Ok(RepoSyncRequestStatus::Ready { mut outcome, .. }) => {
// Ready here means this CA is reusing repo work that has already completed
// (often due to child prefetch). Do not add the transport duration again.
outcome.repo_sync_duration_ms = 0;
ready_queue.push_back(ReadyCaInstance {
node,
repo_outcome: outcome,
});
}
Ok(RepoSyncRequestStatus::Pending { identity, .. }) => {
ca_waiting_repo_by_identity
.entry(identity)
.or_default()
.push(node);
}
Err(err) => {
finished.push(FinishedPublicationPoint {
node,
result: Err(err),
});
}
}
}
}
fn stage_ready_publication_point( fn stage_ready_publication_point(
runner: &Rpkiv1PublicationPointRunner<'_>, runner: &Rpkiv1PublicationPointRunner<'_>,
next_id: &mut u64, next_id: &mut u64,
@ -920,18 +1069,44 @@ fn flush_pending_roa_dispatch(
Ok(metrics) Ok(metrics)
} }
fn flush_pending_roa_dispatch_with_progress(
runner: &Rpkiv1PublicationPointRunner<'_>,
pending_roa_dispatch: &mut VecDeque<OwnedRoaTask>,
inflight_publication_points: &mut HashMap<u64, InflightPublicationPoint>,
pending_finalization: &VecDeque<FinalizeTask>,
) -> Result<(), TreeRunError> {
let dispatch_metrics =
flush_pending_roa_dispatch(runner, pending_roa_dispatch, inflight_publication_points)?;
if dispatch_metrics.attempted > 0 || dispatch_metrics.queue_full {
crate::progress_log::emit(
"phase2_roa_dispatch_batch",
serde_json::json!({
"attempted": dispatch_metrics.attempted,
"submitted": dispatch_metrics.submitted,
"queue_full": dispatch_metrics.queue_full,
"pending_remaining": dispatch_metrics.pending_remaining,
"duration_ms": dispatch_metrics.duration_ms,
"inflight_publication_points": inflight_publication_points.len(),
"pending_finalization_len": pending_finalization.len(),
}),
);
}
Ok(())
}
fn drain_object_results( fn drain_object_results(
runner: &Rpkiv1PublicationPointRunner<'_>, runner: &Rpkiv1PublicationPointRunner<'_>,
inflight_publication_points: &mut HashMap<u64, InflightPublicationPoint>, inflight_publication_points: &mut HashMap<u64, InflightPublicationPoint>,
finished: &mut Vec<FinishedPublicationPoint>, pending_finalization: &mut VecDeque<FinalizeTask>,
compact_audit: bool, result_budget: usize,
) -> Result<ObjectDrainMetrics, TreeRunError> { ) -> Result<ObjectDrainMetrics, TreeRunError> {
let started = Instant::now(); let started = Instant::now();
let mut metrics = ObjectDrainMetrics::default(); let mut metrics = ObjectDrainMetrics::default();
let Some(pool) = runner.parallel_roa_worker_pool.as_ref() else { let Some(pool) = runner.parallel_roa_worker_pool.as_ref() else {
return Ok(metrics); return Ok(metrics);
}; };
loop { let result_budget = result_budget.max(1);
while metrics.results_drained < result_budget {
let Some(result) = pool let Some(result) = pool
.recv_result_timeout(Duration::from_millis(0)) .recv_result_timeout(Duration::from_millis(0))
.map_err(TreeRunError::Runner)? .map_err(TreeRunError::Runner)?
@ -964,86 +1139,72 @@ fn drain_object_results(
let state = inflight_publication_points let state = inflight_publication_points
.remove(&pp_id) .remove(&pp_id)
.expect("inflight publication point must exist"); .expect("inflight publication point must exist");
let objects_processing_ms = state.objects_started_at.elapsed().as_millis() as u64; metrics.publication_points_completed += 1;
let reduce_started = Instant::now(); pending_finalization.push_back(FinalizeTask { state });
let reduce_result = reduce_parallel_roa_stage(
state.objects_stage,
state.results,
runner.timing.as_ref(),
);
let reduce_ms = elapsed_ms(reduce_started);
metrics.reduce_ms_total += reduce_ms;
metrics.reduce_ms_max = metrics.reduce_ms_max.max(reduce_ms);
metrics.publication_points_finalized += 1;
match reduce_result {
Ok(mut objects) => {
let finalize_started = Instant::now();
objects
.router_keys
.extend(state.fresh_stage.discovered_router_keys.clone());
objects.local_outputs_cache.extend(
crate::validation::tree_runner::build_router_key_local_outputs(
&state.node.handle,
&objects.router_keys,
),
);
let result = runner
.finalize_fresh_publication_point_from_reducer(
&state.node.handle,
&state.fresh_stage.fresh_point,
state.warnings,
objects,
state.fresh_stage.child_audits,
state.fresh_stage.discovered_children,
state.repo_outcome.repo_sync_source.as_deref(),
state.repo_outcome.repo_sync_phase.as_deref(),
state.repo_outcome.repo_sync_duration_ms,
state.repo_outcome.repo_sync_err.as_deref(),
)
.map(|out| out.result);
let finalize_ms = elapsed_ms(finalize_started);
metrics.finalize_ms_total += finalize_ms;
metrics.finalize_ms_max = metrics.finalize_ms_max.max(finalize_ms);
crate::progress_log::emit(
"phase2_publication_point_reduced",
serde_json::json!({
"manifest_rsync_uri": state.node.handle.manifest_rsync_uri.as_str(),
"publication_point_rsync_uri": state.node.handle.publication_point_rsync_uri.as_str(),
"objects_processing_ms": objects_processing_ms,
"task_count": state.task_count,
"tasks_submitted": state.tasks_submitted,
"first_task_submitted_ms": state.first_task_submitted_at.map(|t| t.saturating_duration_since(state.objects_started_at).as_millis() as u64),
"last_task_submitted_ms": state.last_task_submitted_at.map(|t| t.saturating_duration_since(state.objects_started_at).as_millis() as u64),
"task_submit_span_ms": match (state.first_task_submitted_at, state.last_task_submitted_at) {
(Some(first), Some(last)) => Some(last.saturating_duration_since(first).as_millis() as u64),
_ => None,
},
"first_result_ms": state.first_result_at.map(|t| t.saturating_duration_since(state.objects_started_at).as_millis() as u64),
"last_result_ms": state.last_result_at.map(|t| t.saturating_duration_since(state.objects_started_at).as_millis() as u64),
"result_span_ms": match (state.first_result_at, state.last_result_at) {
(Some(first), Some(last)) => Some(last.saturating_duration_since(first).as_millis() as u64),
_ => None,
},
"worker_ms_total": state.worker_ms_total,
"worker_ms_max": state.worker_ms_max,
"worker_ms_avg": if state.task_count > 0 { state.worker_ms_total / state.task_count as u64 } else { 0 },
"queue_wait_ms_total": state.queue_wait_ms_total,
"queue_wait_ms_max": state.queue_wait_ms_max,
"queue_wait_ms_avg": if state.task_count > 0 { state.queue_wait_ms_total / state.task_count as u64 } else { 0 },
"reduce_ms": reduce_ms,
"finalize_ms": finalize_ms,
"total_duration_ms": state.started_at.elapsed().as_millis() as u64,
}),
);
finished.push(FinishedPublicationPoint {
node: state.node,
result: compact_phase2_finished_result_result(result, compact_audit),
});
} }
Err(err) => finished.push(FinishedPublicationPoint { }
node: state.node, metrics.result_budget_exhausted = metrics.results_drained == result_budget;
result: Err(err), metrics.duration_ms = elapsed_ms(started);
Ok(metrics)
}
fn drain_object_results_with_progress(
runner: &Rpkiv1PublicationPointRunner<'_>,
inflight_publication_points: &mut HashMap<u64, InflightPublicationPoint>,
pending_finalization: &mut VecDeque<FinalizeTask>,
pending_roa_dispatch_len: usize,
result_budget: usize,
) -> Result<(), TreeRunError> {
let drain_metrics = drain_object_results(
runner,
inflight_publication_points,
pending_finalization,
result_budget,
)?;
if drain_metrics.results_drained > 0 || drain_metrics.result_budget_exhausted {
crate::progress_log::emit(
"phase2_object_results_drain",
serde_json::json!({
"results_drained": drain_metrics.results_drained,
"publication_points_completed": drain_metrics.publication_points_completed,
"result_budget_exhausted": drain_metrics.result_budget_exhausted,
"result_drain_batch_size": result_budget,
"worker_ms_total": drain_metrics.worker_ms_total,
"worker_ms_max": drain_metrics.worker_ms_max,
"queue_wait_ms_total": drain_metrics.queue_wait_ms_total,
"queue_wait_ms_max": drain_metrics.queue_wait_ms_max,
"duration_ms": drain_metrics.duration_ms,
"pending_roa_dispatch_len": pending_roa_dispatch_len,
"inflight_publication_points": inflight_publication_points.len(),
"pending_finalization_len": pending_finalization.len(),
}), }),
);
}
Ok(())
}
fn submit_pending_finalization(
finalize_task_tx: &SyncSender<FinalizeTask>,
pending_finalization: &mut VecDeque<FinalizeTask>,
finalize_inflight: &mut usize,
) -> Result<FinalizeSubmitMetrics, TreeRunError> {
let started = Instant::now();
let mut metrics = FinalizeSubmitMetrics::default();
while let Some(task) = pending_finalization.pop_front() {
match finalize_task_tx.try_send(task) {
Ok(()) => {
metrics.submitted += 1;
*finalize_inflight += 1;
}
Err(TrySendError::Full(task)) => {
pending_finalization.push_front(task);
metrics.queue_full = true;
break;
}
Err(TrySendError::Disconnected(_task)) => {
return Err(TreeRunError::Runner(
"phase2 finalize worker queue disconnected".to_string(),
));
} }
} }
} }
@ -1051,6 +1212,241 @@ fn drain_object_results(
Ok(metrics) Ok(metrics)
} }
fn submit_pending_finalization_with_progress(
finalize_task_tx: &SyncSender<FinalizeTask>,
pending_finalization: &mut VecDeque<FinalizeTask>,
finalize_inflight: &mut usize,
finalize_queue_capacity: usize,
pending_roa_dispatch_len: usize,
inflight_publication_points_len: usize,
) -> Result<(), TreeRunError> {
let submit_metrics =
submit_pending_finalization(finalize_task_tx, pending_finalization, finalize_inflight)?;
if submit_metrics.submitted > 0 || submit_metrics.queue_full {
crate::progress_log::emit(
"phase2_finalize_task_submit",
serde_json::json!({
"submitted": submit_metrics.submitted,
"queue_full": submit_metrics.queue_full,
"duration_ms": submit_metrics.duration_ms,
"finalize_queue_capacity": finalize_queue_capacity,
"pending_finalization_len": pending_finalization.len(),
"finalize_inflight": *finalize_inflight,
"pending_roa_dispatch_len": pending_roa_dispatch_len,
"inflight_publication_points": inflight_publication_points_len,
}),
);
}
Ok(())
}
fn drain_finalize_results(
finalize_result_rx: &Receiver<FinalizeWorkerResult>,
finished: &mut Vec<FinishedPublicationPoint>,
finalize_inflight: &mut usize,
) -> Result<FinalizeResultsDrainMetrics, TreeRunError> {
let started = Instant::now();
let mut metrics = FinalizeResultsDrainMetrics::default();
loop {
match finalize_result_rx.try_recv() {
Ok(result) => {
metrics.results_drained += 1;
metrics.reduce_ms_total += result.metrics.reduce_ms;
metrics.reduce_ms_max = metrics.reduce_ms_max.max(result.metrics.reduce_ms);
metrics.finalize_ms_total += result.metrics.finalize_ms;
metrics.finalize_ms_max = metrics.finalize_ms_max.max(result.metrics.finalize_ms);
metrics.finalize_queue_wait_ms_max = metrics
.finalize_queue_wait_ms_max
.max(result.metrics.finalize_queue_wait_ms.unwrap_or(0));
metrics.finalize_worker_ms_total += result.metrics.finalize_worker_ms;
metrics.finalize_worker_ms_max = metrics
.finalize_worker_ms_max
.max(result.metrics.finalize_worker_ms);
*finalize_inflight = finalize_inflight.saturating_sub(1);
finished.push(result.finished);
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
if *finalize_inflight == 0 {
break;
}
return Err(TreeRunError::Runner(
"phase2 finalize result channel disconnected".to_string(),
));
}
}
}
metrics.duration_ms = elapsed_ms(started);
Ok(metrics)
}
fn drain_finalize_results_with_progress(
finalize_result_rx: &Receiver<FinalizeWorkerResult>,
finished: &mut Vec<FinishedPublicationPoint>,
finalize_inflight: &mut usize,
pending_finalization_len: usize,
pending_roa_dispatch_len: usize,
inflight_publication_points_len: usize,
) -> Result<(), TreeRunError> {
let drain_metrics = drain_finalize_results(finalize_result_rx, finished, finalize_inflight)?;
if drain_metrics.results_drained >= 64
|| (drain_metrics.results_drained > 0
&& (*finalize_inflight == 0 || pending_finalization_len > 0))
{
crate::progress_log::emit(
"phase2_finalize_results_drain",
serde_json::json!({
"results_drained": drain_metrics.results_drained,
"reduce_ms_total": drain_metrics.reduce_ms_total,
"reduce_ms_max": drain_metrics.reduce_ms_max,
"finalize_ms_total": drain_metrics.finalize_ms_total,
"finalize_ms_max": drain_metrics.finalize_ms_max,
"finalize_queue_wait_ms_max": drain_metrics.finalize_queue_wait_ms_max,
"finalize_worker_ms_total": drain_metrics.finalize_worker_ms_total,
"finalize_worker_ms_max": drain_metrics.finalize_worker_ms_max,
"duration_ms": drain_metrics.duration_ms,
"pending_finalization_len": pending_finalization_len,
"finalize_inflight": *finalize_inflight,
"pending_roa_dispatch_len": pending_roa_dispatch_len,
"inflight_publication_points": inflight_publication_points_len,
}),
);
}
Ok(())
}
fn run_finalize_worker(
runner: &Rpkiv1PublicationPointRunner<'_>,
finalize_task_rx: Receiver<FinalizeTask>,
finalize_result_tx: mpsc::Sender<FinalizeWorkerResult>,
compact_audit: bool,
) -> Result<(), TreeRunError> {
while let Ok(task) = finalize_task_rx.recv() {
let result = finalize_publication_point_state(runner, task.state, compact_audit);
if finalize_result_tx.send(result).is_err() {
return Err(TreeRunError::Runner(
"phase2 finalize result receiver disconnected".to_string(),
));
}
}
Ok(())
}
fn finalize_publication_point_state(
runner: &Rpkiv1PublicationPointRunner<'_>,
state: InflightPublicationPoint,
compact_audit: bool,
) -> FinalizeWorkerResult {
let finalize_worker_started = Instant::now();
let InflightPublicationPoint {
node,
fresh_stage,
objects_stage,
repo_outcome,
warnings,
started_at,
objects_started_at,
task_count,
tasks_submitted,
first_task_submitted_at,
last_task_submitted_at,
first_result_at,
last_result_at,
worker_ms_total,
worker_ms_max,
queue_wait_ms_total,
queue_wait_ms_max,
results,
} = state;
let finalize_queue_wait_ms = last_result_at.map(|last_result| {
Instant::now()
.saturating_duration_since(last_result)
.as_millis() as u64
});
let objects_processing_ms = objects_started_at.elapsed().as_millis() as u64;
let reduce_started = Instant::now();
let reduce_result = reduce_parallel_roa_stage(objects_stage, results, runner.timing.as_ref());
let reduce_ms = elapsed_ms(reduce_started);
let (result, finalize_ms) = match reduce_result {
Ok(mut objects) => {
let finalize_started = Instant::now();
objects
.router_keys
.extend(fresh_stage.discovered_router_keys.clone());
objects.local_outputs_cache.extend(
crate::validation::tree_runner::build_router_key_local_outputs(
&node.handle,
&objects.router_keys,
),
);
let result = runner
.finalize_fresh_publication_point_from_reducer(
&node.handle,
&fresh_stage.fresh_point,
warnings,
objects,
fresh_stage.child_audits,
fresh_stage.discovered_children,
repo_outcome.repo_sync_source.as_deref(),
repo_outcome.repo_sync_phase.as_deref(),
repo_outcome.repo_sync_duration_ms,
repo_outcome.repo_sync_err.as_deref(),
)
.map(|out| out.result);
(
compact_phase2_finished_result_result(result, compact_audit),
elapsed_ms(finalize_started),
)
}
Err(err) => (Err(err), 0),
};
let finalize_worker_ms = elapsed_ms(finalize_worker_started);
crate::progress_log::emit(
"phase2_publication_point_reduced",
serde_json::json!({
"manifest_rsync_uri": node.handle.manifest_rsync_uri.as_str(),
"publication_point_rsync_uri": node.handle.publication_point_rsync_uri.as_str(),
"objects_processing_ms": objects_processing_ms,
"task_count": task_count,
"tasks_submitted": tasks_submitted,
"first_task_submitted_ms": first_task_submitted_at.map(|t| t.saturating_duration_since(objects_started_at).as_millis() as u64),
"last_task_submitted_ms": last_task_submitted_at.map(|t| t.saturating_duration_since(objects_started_at).as_millis() as u64),
"task_submit_span_ms": match (first_task_submitted_at, last_task_submitted_at) {
(Some(first), Some(last)) => Some(last.saturating_duration_since(first).as_millis() as u64),
_ => None,
},
"first_result_ms": first_result_at.map(|t| t.saturating_duration_since(objects_started_at).as_millis() as u64),
"last_result_ms": last_result_at.map(|t| t.saturating_duration_since(objects_started_at).as_millis() as u64),
"all_results_ready_ms": last_result_at.map(|t| t.saturating_duration_since(objects_started_at).as_millis() as u64),
"finalize_queue_wait_ms": finalize_queue_wait_ms,
"result_span_ms": match (first_result_at, last_result_at) {
(Some(first), Some(last)) => Some(last.saturating_duration_since(first).as_millis() as u64),
_ => None,
},
"worker_ms_total": worker_ms_total,
"worker_ms_max": worker_ms_max,
"worker_ms_avg": if task_count > 0 { worker_ms_total / task_count as u64 } else { 0 },
"queue_wait_ms_total": queue_wait_ms_total,
"queue_wait_ms_max": queue_wait_ms_max,
"queue_wait_ms_avg": if task_count > 0 { queue_wait_ms_total / task_count as u64 } else { 0 },
"reduce_ms": reduce_ms,
"finalize_ms": finalize_ms,
"finalize_worker_ms": finalize_worker_ms,
"total_duration_ms": started_at.elapsed().as_millis() as u64,
}),
);
FinalizeWorkerResult {
finished: FinishedPublicationPoint { node, result },
metrics: FinalizePublicationPointMetrics {
reduce_ms,
finalize_ms,
finalize_queue_wait_ms,
finalize_worker_ms,
},
}
}
fn drain_repo_events( fn drain_repo_events(
repo_runtime: &dyn crate::parallel::repo_runtime::RepoSyncRuntime, repo_runtime: &dyn crate::parallel::repo_runtime::RepoSyncRuntime,
ca_waiting_repo_by_identity: &mut HashMap<RepoIdentity, Vec<QueuedCaInstance>>, ca_waiting_repo_by_identity: &mut HashMap<RepoIdentity, Vec<QueuedCaInstance>>,
@ -1092,15 +1488,20 @@ fn event_poll_timeout(
ready_queue: &VecDeque<ReadyCaInstance>, ready_queue: &VecDeque<ReadyCaInstance>,
pending_roa_dispatch: &VecDeque<OwnedRoaTask>, pending_roa_dispatch: &VecDeque<OwnedRoaTask>,
inflight_publication_points: &HashMap<u64, InflightPublicationPoint>, inflight_publication_points: &HashMap<u64, InflightPublicationPoint>,
pending_finalization: &VecDeque<FinalizeTask>,
finalize_inflight: usize,
instances_started: usize, instances_started: usize,
config: &TreeRunConfig, config: &TreeRunConfig,
) -> Duration { ) -> Duration {
if !ready_queue.is_empty() if !ready_queue.is_empty()
|| !pending_roa_dispatch.is_empty() || !pending_roa_dispatch.is_empty()
|| !inflight_publication_points.is_empty() || !inflight_publication_points.is_empty()
|| !pending_finalization.is_empty()
|| (!ca_queue.is_empty() && can_start_more(instances_started, config)) || (!ca_queue.is_empty() && can_start_more(instances_started, config))
{ {
Duration::from_millis(0) Duration::from_millis(0)
} else if finalize_inflight > 0 {
Duration::from_millis(10)
} else { } else {
Duration::from_millis(50) Duration::from_millis(50)
} }
@ -1112,6 +1513,8 @@ fn is_complete(
ca_waiting_repo_by_identity: &HashMap<RepoIdentity, Vec<QueuedCaInstance>>, ca_waiting_repo_by_identity: &HashMap<RepoIdentity, Vec<QueuedCaInstance>>,
pending_roa_dispatch: &VecDeque<OwnedRoaTask>, pending_roa_dispatch: &VecDeque<OwnedRoaTask>,
inflight_publication_points: &HashMap<u64, InflightPublicationPoint>, inflight_publication_points: &HashMap<u64, InflightPublicationPoint>,
pending_finalization: &VecDeque<FinalizeTask>,
finalize_inflight: usize,
instances_started: usize, instances_started: usize,
config: &TreeRunConfig, config: &TreeRunConfig,
) -> bool { ) -> bool {
@ -1121,6 +1524,8 @@ fn is_complete(
&& ca_waiting_repo_by_identity.is_empty() && ca_waiting_repo_by_identity.is_empty()
&& pending_roa_dispatch.is_empty() && pending_roa_dispatch.is_empty()
&& inflight_publication_points.is_empty() && inflight_publication_points.is_empty()
&& pending_finalization.is_empty()
&& finalize_inflight == 0
} }
fn build_tree_output(mut finished: Vec<FinishedPublicationPoint>) -> TreeRunAuditOutput { fn build_tree_output(mut finished: Vec<FinishedPublicationPoint>) -> TreeRunAuditOutput {

View File

@ -192,6 +192,7 @@ fn parallel_roa_processing_matches_serial_for_real_cernet_fixture() {
&ParallelPhase2Config { &ParallelPhase2Config {
object_workers: 2, object_workers: 2,
worker_queue_capacity: 4, worker_queue_capacity: 4,
..ParallelPhase2Config::default()
}, },
); );
@ -238,6 +239,7 @@ fn parallel_roa_processing_reports_issuer_decode_failure_like_serial() {
&ParallelPhase2Config { &ParallelPhase2Config {
object_workers: 2, object_workers: 2,
worker_queue_capacity: 4, worker_queue_capacity: 4,
..ParallelPhase2Config::default()
}, },
); );
@ -287,6 +289,7 @@ fn parallel_roa_processing_reports_missing_crl_like_serial() {
&ParallelPhase2Config { &ParallelPhase2Config {
object_workers: 2, object_workers: 2,
worker_queue_capacity: 4, worker_queue_capacity: 4,
..ParallelPhase2Config::default()
}, },
); );

View File

@ -559,6 +559,7 @@ fn parallel_roa_processing_drop_object_records_roa_and_aspa_errors_like_serial()
let phase2_config = ParallelPhase2Config { let phase2_config = ParallelPhase2Config {
object_workers: 2, object_workers: 2,
worker_queue_capacity: 1, worker_queue_capacity: 1,
..ParallelPhase2Config::default()
}; };
let pool = ParallelRoaWorkerPool::new(&phase2_config).expect("parallel roa pool"); let pool = ParallelRoaWorkerPool::new(&phase2_config).expect("parallel roa pool");
let parallel = process_publication_point_for_issuer_parallel_roa_with_pool( let parallel = process_publication_point_for_issuer_parallel_roa_with_pool(
@ -637,6 +638,7 @@ fn parallel_roa_processing_falls_back_for_drop_publication_point_policy() {
&ParallelPhase2Config { &ParallelPhase2Config {
object_workers: 2, object_workers: 2,
worker_queue_capacity: 1, worker_queue_capacity: 1,
..ParallelPhase2Config::default()
}, },
); );
@ -689,6 +691,7 @@ fn parallel_roa_processing_falls_back_for_single_worker_config() {
&ParallelPhase2Config { &ParallelPhase2Config {
object_workers: 1, object_workers: 1,
worker_queue_capacity: 1, worker_queue_capacity: 1,
..ParallelPhase2Config::default()
}, },
); );
@ -740,6 +743,7 @@ fn parallel_roa_processing_falls_back_when_pool_creation_fails() {
&ParallelPhase2Config { &ParallelPhase2Config {
object_workers: 2, object_workers: 2,
worker_queue_capacity: 0, worker_queue_capacity: 0,
..ParallelPhase2Config::default()
}, },
); );