From 87275b5c575d6784d1a8b12484c7dbbdbc382cbf Mon Sep 17 00:00:00 2001 From: yuyr Date: Mon, 27 Apr 2026 17:13:32 +0800 Subject: [PATCH] =?UTF-8?q?20260427=5F2=20=E7=A7=BB=E9=99=A4parallel=20pha?= =?UTF-8?q?se=E5=BC=80=E5=85=B3=EF=BC=8C=E9=BB=98=E8=AE=A4=E5=85=A8?= =?UTF-8?q?=E9=87=8F=E5=B9=B6=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../compare/run_perf_compare_quick_remote.sh | 1 - .../run_apnic_ours_parallel_round_remote.sh | 1 - src/cli.rs | 698 +++++------------- src/parallel/config.rs | 4 +- tests/test_multi_tal_parallel_m2.rs | 1 - ...st_parallel_phase1_transport_offline_r5.rs | 99 ++- 6 files changed, 223 insertions(+), 581 deletions(-) diff --git a/scripts/compare/run_perf_compare_quick_remote.sh b/scripts/compare/run_perf_compare_quick_remote.sh index 84ab794..d26dd57 100755 --- a/scripts/compare/run_perf_compare_quick_remote.sh +++ b/scripts/compare/run_perf_compare_quick_remote.sh @@ -224,7 +224,6 @@ PY --raw-store-db state/ours/raw-store.db \ --repo-bytes-db state/ours/repo-bytes.db \ "${OURS_TAL_ARGS[@]}" \ - --parallel-phase1 \ "${OURS_EXTRA_ARGV[@]}" \ --ccr-out "steps/$STEP_ID/ours/result.ccr" \ --report-json "steps/$STEP_ID/ours/report.json" \ diff --git a/scripts/periodic/run_apnic_ours_parallel_round_remote.sh b/scripts/periodic/run_apnic_ours_parallel_round_remote.sh index 20f3054..db885ad 100755 --- a/scripts/periodic/run_apnic_ours_parallel_round_remote.sh +++ b/scripts/periodic/run_apnic_ours_parallel_round_remote.sh @@ -102,7 +102,6 @@ env RPKI_PROGRESS_LOG=1 RPKI_PROGRESS_SLOW_SECS=0 target/release/rpki \ --repo-bytes-db "$REMOTE_REPO_BYTES" \ --tal-path tests/fixtures/tal/apnic-rfc7730-https.tal \ --ta-path tests/fixtures/ta/apnic-ta.cer \ - --parallel-phase1 \ --ccr-out "$ccr_out" \ --report-json "$report_out" \ --cir-enable \ diff --git a/src/cli.rs b/src/cli.rs index ab265e7..6f7bd70 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -18,18 +18,13 @@ use crate::parallel::types::TalInputSpec; use crate::policy::Policy; use crate::storage::RocksStore; use crate::validation::run_tree_from_tal::{ - RunTreeFromTalAuditOutput, run_tree_from_multiple_tals_parallel_phase1_audit, - run_tree_from_multiple_tals_parallel_phase2_audit, - run_tree_from_tal_and_ta_der_parallel_phase1_audit, + RunTreeFromTalAuditOutput, run_tree_from_multiple_tals_parallel_phase2_audit, run_tree_from_tal_and_ta_der_parallel_phase2_audit, run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit, run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit_with_timing, run_tree_from_tal_and_ta_der_payload_replay_serial_audit, run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing, - run_tree_from_tal_and_ta_der_serial_audit, - run_tree_from_tal_and_ta_der_serial_audit_with_timing, - run_tree_from_tal_url_parallel_phase1_audit, run_tree_from_tal_url_parallel_phase2_audit, - run_tree_from_tal_url_serial_audit, run_tree_from_tal_url_serial_audit_with_timing, + run_tree_from_tal_url_parallel_phase2_audit, }; use crate::validation::tree::TreeRunConfig; use serde::Serialize; @@ -64,11 +59,8 @@ pub struct CliArgs { pub tal_url: Option, pub tal_path: Option, pub ta_path: Option, - pub parallel_phase1: bool, - pub parallel_phase2: bool, - pub parallel_tal_urls: Vec, - pub parallel_phase1_config: Option, - pub parallel_phase2_config: Option, + pub parallel_phase1_config: ParallelPhase1Config, + pub parallel_phase2_config: ParallelPhase2Config, pub tal_inputs: Vec, pub db_path: PathBuf, @@ -138,18 +130,16 @@ Options: --tal-url TAL URL (repeatable; URL mode) --tal-path TAL file path (repeatable; file mode) --ta-path TA certificate DER file path (repeatable in file mode; pairs with --tal-path by position) - --parallel-phase1 Enable Phase 1 parallel scheduler skeleton - --parallel-phase2 Enable Phase 2 ROA object validation worker pool (requires --parallel-phase1) --parallel-max-repo-sync-workers-global - Phase 1 global repo sync worker budget + Phase 1 global repo sync worker budget (default: 4) --parallel-max-inflight-snapshot-bytes-global - Phase 1 inflight snapshot byte budget + Phase 1 inflight snapshot byte budget (default: 512MiB) --parallel-max-pending-repo-results - Phase 1 pending repo result budget + Phase 1 pending repo result budget (default: 1024) --parallel-phase2-object-workers - Phase 2 object worker count + Phase 2 object worker count (default: 8) --parallel-phase2-worker-queue-capacity - Phase 2 per-worker object queue capacity + Phase 2 per-worker object queue capacity (default: 256) --rsync-local-dir Use LocalDirRsyncFetcher rooted at this directory (offline tests) --disable-rrdp Disable RRDP and synchronize only via rsync @@ -172,12 +162,8 @@ pub fn parse_args(argv: &[String]) -> Result { let mut tal_urls: Vec = Vec::new(); let mut tal_paths: Vec = Vec::new(); let mut ta_paths: Vec = Vec::new(); - let mut parallel_phase1: bool = false; - let mut parallel_phase2: bool = false; let mut parallel_phase1_cfg = ParallelPhase1Config::default(); - let mut parallel_phase1_cfg_overridden: bool = false; let mut parallel_phase2_cfg = ParallelPhase2Config::default(); - let mut parallel_phase2_cfg_overridden: bool = false; let mut db_path: Option = None; let mut raw_store_db: Option = None; @@ -231,12 +217,6 @@ pub fn parse_args(argv: &[String]) -> Result { let v = argv.get(i).ok_or("--ta-path requires a value")?; ta_paths.push(PathBuf::from(v)); } - "--parallel-phase1" => { - parallel_phase1 = true; - } - "--parallel-phase2" => { - parallel_phase2 = true; - } "--parallel-max-repo-sync-workers-global" => { i += 1; let v = argv @@ -245,7 +225,6 @@ pub fn parse_args(argv: &[String]) -> Result { parallel_phase1_cfg.max_repo_sync_workers_global = v .parse::() .map_err(|_| format!("invalid --parallel-max-repo-sync-workers-global: {v}"))?; - parallel_phase1_cfg_overridden = true; } "--parallel-max-inflight-snapshot-bytes-global" => { i += 1; @@ -256,7 +235,6 @@ pub fn parse_args(argv: &[String]) -> Result { v.parse::().map_err(|_| { format!("invalid --parallel-max-inflight-snapshot-bytes-global: {v}") })?; - parallel_phase1_cfg_overridden = true; } "--parallel-max-pending-repo-results" => { i += 1; @@ -266,7 +244,6 @@ pub fn parse_args(argv: &[String]) -> Result { parallel_phase1_cfg.max_pending_repo_results = v .parse::() .map_err(|_| format!("invalid --parallel-max-pending-repo-results: {v}"))?; - parallel_phase1_cfg_overridden = true; } "--parallel-phase2-object-workers" => { i += 1; @@ -276,7 +253,6 @@ pub fn parse_args(argv: &[String]) -> Result { parallel_phase2_cfg.object_workers = v .parse::() .map_err(|_| format!("invalid --parallel-phase2-object-workers: {v}"))?; - parallel_phase2_cfg_overridden = true; } "--parallel-phase2-worker-queue-capacity" => { i += 1; @@ -286,7 +262,6 @@ pub fn parse_args(argv: &[String]) -> Result { parallel_phase2_cfg.worker_queue_capacity = v .parse::() .map_err(|_| format!("invalid --parallel-phase2-worker-queue-capacity: {v}"))?; - parallel_phase2_cfg_overridden = true; } "--db" => { i += 1; @@ -467,31 +442,13 @@ pub fn parse_args(argv: &[String]) -> Result { usage() )); } - if !parallel_phase1 && parallel_phase1_cfg_overridden { - return Err(format!( - "--parallel-max-* options require --parallel-phase1\n\n{}", - usage() - )); - } - if !parallel_phase2 && parallel_phase2_cfg_overridden { - return Err(format!( - "--parallel-phase2-* options require --parallel-phase2\n\n{}", - usage() - )); - } - if parallel_phase2 && !parallel_phase1 { - return Err(format!( - "--parallel-phase2 requires --parallel-phase1\n\n{}", - usage() - )); - } - if parallel_phase2 && parallel_phase2_cfg.object_workers == 0 { + if parallel_phase2_cfg.object_workers == 0 { return Err(format!( "--parallel-phase2-object-workers must be > 0\n\n{}", usage() )); } - if parallel_phase2 && parallel_phase2_cfg.worker_queue_capacity == 0 { + if parallel_phase2_cfg.worker_queue_capacity == 0 { return Err(format!( "--parallel-phase2-worker-queue-capacity must be > 0\n\n{}", usage() @@ -504,8 +461,7 @@ pub fn parse_args(argv: &[String]) -> Result { )); } if !tal_paths.is_empty() { - let strict_pairing_required = - parallel_phase1 || tal_paths.len() > 1 || !ta_paths.is_empty(); + let strict_pairing_required = tal_paths.len() > 1 || !ta_paths.is_empty(); if strict_pairing_required { if ta_paths.len() != tal_paths.len() { return Err(format!( @@ -520,12 +476,6 @@ pub fn parse_args(argv: &[String]) -> Result { )); } } - if !parallel_phase1 && (tal_urls.len() > 1 || tal_paths.len() > 1) { - return Err(format!( - "multi-TAL execution requires --parallel-phase1\n\n{}", - usage() - )); - } let tal_url = tal_urls.first().cloned(); let tal_path = tal_paths.first().cloned(); let ta_path = ta_paths.first().cloned(); @@ -674,11 +624,8 @@ pub fn parse_args(argv: &[String]) -> Result { tal_url, tal_path, ta_path, - parallel_phase1, - parallel_phase2, - parallel_tal_urls: Vec::new(), - parallel_phase1_config: parallel_phase1.then_some(parallel_phase1_cfg), - parallel_phase2_config: parallel_phase2.then_some(parallel_phase2_cfg), + parallel_phase1_config: parallel_phase1_cfg, + parallel_phase2_config: parallel_phase2_cfg, tal_inputs, db_path, raw_store_db, @@ -1063,6 +1010,111 @@ fn build_repo_sync_stats( stats } +fn run_online_validation_with_fetchers( + store: Arc, + policy: &Policy, + args: &CliArgs, + http: &H, + rsync: &R, + validation_time: time::OffsetDateTime, + config: &TreeRunConfig, + collect_current_repo_objects: bool, + timing: Option<&TimingHandle>, +) -> Result +where + H: crate::sync::rrdp::Fetcher + Clone + 'static, + R: crate::fetch::rsync::RsyncFetcher + Clone + 'static, +{ + if args.tal_inputs.len() > 1 { + return run_tree_from_multiple_tals_parallel_phase2_audit( + store, + policy, + args.tal_inputs.clone(), + http, + rsync, + validation_time, + config, + args.parallel_phase1_config.clone(), + args.parallel_phase2_config.clone(), + collect_current_repo_objects, + ) + .map_err(|e| e.to_string()); + } + + match ( + args.tal_url.as_ref(), + args.tal_path.as_ref(), + args.ta_path.as_ref(), + ) { + (Some(url), _, _) => run_tree_from_tal_url_parallel_phase2_audit( + store, + policy, + url, + http, + rsync, + validation_time, + config, + args.parallel_phase1_config.clone(), + args.parallel_phase2_config.clone(), + collect_current_repo_objects, + ) + .map_err(|e| e.to_string()), + (None, Some(tal_path), Some(ta_path)) => { + let tal_bytes = std::fs::read(tal_path) + .map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?; + let ta_der = std::fs::read(ta_path) + .map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?; + run_tree_from_tal_and_ta_der_parallel_phase2_audit( + store, + policy, + &tal_bytes, + &ta_der, + None, + http, + rsync, + validation_time, + config, + args.parallel_phase1_config.clone(), + args.parallel_phase2_config.clone(), + collect_current_repo_objects, + ) + .map_err(|e| e.to_string()) + } + (None, Some(tal_path), None) => { + let tal_bytes = std::fs::read(tal_path) + .map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?; + let tal_uri = args.cir_tal_uri.clone(); + if let Some(t) = timing { + crate::validation::run_tree_from_tal::run_tree_from_tal_bytes_serial_audit_with_timing( + store.as_ref(), + policy, + &tal_bytes, + tal_uri, + http, + rsync, + validation_time, + config, + t, + ) + .map_err(|e| e.to_string()) + } else { + crate::validation::run_tree_from_tal::run_tree_from_tal_bytes_serial_audit( + store.as_ref(), + policy, + &tal_bytes, + tal_uri, + http, + rsync, + validation_time, + config, + ) + .map_err(|e| e.to_string()) + } + } + _ => unreachable!("validated by parse_args"), + } +} + pub fn run(argv: &[String]) -> Result<(), String> { let args = parse_args(argv)?; @@ -1276,215 +1328,17 @@ pub fn run(argv: &[String]) -> Result<(), String> { }) .map_err(|e| e.to_string())?; let rsync = LocalDirRsyncFetcher::new(dir); - if args.parallel_phase1 && args.tal_inputs.len() > 1 { - if args.parallel_phase2 { - run_tree_from_multiple_tals_parallel_phase2_audit( - Arc::clone(&store), - &policy, - args.tal_inputs.clone(), - &http, - &rsync, - validation_time, - &config, - args.parallel_phase1_config - .clone() - .expect("phase1 config present"), - args.parallel_phase2_config - .clone() - .expect("phase2 config present"), - collect_current_repo_objects, - ) - .map_err(|e| e.to_string())? - } else { - run_tree_from_multiple_tals_parallel_phase1_audit( - Arc::clone(&store), - &policy, - args.tal_inputs.clone(), - &http, - &rsync, - validation_time, - &config, - args.parallel_phase1_config - .clone() - .expect("phase1 config present"), - collect_current_repo_objects, - ) - .map_err(|e| e.to_string())? - } - } else { - match ( - args.tal_url.as_ref(), - args.tal_path.as_ref(), - args.ta_path.as_ref(), - ) { - (Some(url), _, _) => { - if args.parallel_phase1 { - if args.parallel_phase2 { - run_tree_from_tal_url_parallel_phase2_audit( - Arc::clone(&store), - &policy, - url, - &http, - &rsync, - validation_time, - &config, - args.parallel_phase1_config - .clone() - .expect("phase1 config present"), - args.parallel_phase2_config - .clone() - .expect("phase2 config present"), - collect_current_repo_objects, - ) - .map_err(|e| e.to_string())? - } else { - run_tree_from_tal_url_parallel_phase1_audit( - Arc::clone(&store), - &policy, - url, - &http, - &rsync, - validation_time, - &config, - args.parallel_phase1_config - .clone() - .expect("phase1 config present"), - collect_current_repo_objects, - ) - .map_err(|e| e.to_string())? - } - } else if let Some((_, t)) = timing.as_ref() { - run_tree_from_tal_url_serial_audit_with_timing( - store.as_ref(), - &policy, - url, - &http, - &rsync, - validation_time, - &config, - t, - ) - .map_err(|e| e.to_string())? - } else { - run_tree_from_tal_url_serial_audit( - store.as_ref(), - &policy, - url, - &http, - &rsync, - validation_time, - &config, - ) - .map_err(|e| e.to_string())? - } - } - (None, Some(tal_path), Some(ta_path)) => { - let tal_bytes = std::fs::read(tal_path) - .map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?; - let ta_der = std::fs::read(ta_path) - .map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?; - if args.parallel_phase1 { - if args.parallel_phase2 { - run_tree_from_tal_and_ta_der_parallel_phase2_audit( - Arc::clone(&store), - &policy, - &tal_bytes, - &ta_der, - None, - &http, - &rsync, - validation_time, - &config, - args.parallel_phase1_config - .clone() - .expect("phase1 config present"), - args.parallel_phase2_config - .clone() - .expect("phase2 config present"), - collect_current_repo_objects, - ) - .map_err(|e| e.to_string())? - } else { - run_tree_from_tal_and_ta_der_parallel_phase1_audit( - Arc::clone(&store), - &policy, - &tal_bytes, - &ta_der, - None, - &http, - &rsync, - validation_time, - &config, - args.parallel_phase1_config - .clone() - .expect("phase1 config present"), - collect_current_repo_objects, - ) - .map_err(|e| e.to_string())? - } - } else if let Some((_, t)) = timing.as_ref() { - run_tree_from_tal_and_ta_der_serial_audit_with_timing( - store.as_ref(), - &policy, - &tal_bytes, - &ta_der, - None, - &http, - &rsync, - validation_time, - &config, - t, - ) - .map_err(|e| e.to_string())? - } else { - run_tree_from_tal_and_ta_der_serial_audit( - store.as_ref(), - &policy, - &tal_bytes, - &ta_der, - None, - &http, - &rsync, - validation_time, - &config, - ) - .map_err(|e| e.to_string())? - } - } - (None, Some(tal_path), None) => { - let tal_bytes = std::fs::read(tal_path) - .map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?; - let tal_uri = args.cir_tal_uri.clone(); - if let Some((_, t)) = timing.as_ref() { - crate::validation::run_tree_from_tal::run_tree_from_tal_bytes_serial_audit_with_timing( - store.as_ref(), - &policy, - &tal_bytes, - tal_uri, - &http, - &rsync, - validation_time, - &config, - t, - ) - .map_err(|e| e.to_string())? - } else { - crate::validation::run_tree_from_tal::run_tree_from_tal_bytes_serial_audit( - store.as_ref(), - &policy, - &tal_bytes, - tal_uri, - &http, - &rsync, - validation_time, - &config, - ) - .map_err(|e| e.to_string())? - } - } - _ => unreachable!("validated by parse_args"), - } - } + run_online_validation_with_fetchers( + Arc::clone(&store), + &policy, + &args, + &http, + &rsync, + validation_time, + &config, + collect_current_repo_objects, + timing.as_ref().map(|(_, t)| t), + )? } else { let http = BlockingHttpFetcher::new(HttpFetcherConfig { timeout: std::time::Duration::from_secs(args.http_timeout_secs.max(1)), @@ -1500,215 +1354,17 @@ pub fn run(argv: &[String]) -> Result<(), String> { mirror_root: args.rsync_mirror_root.clone(), ..SystemRsyncConfig::default() }); - if args.parallel_phase1 && args.tal_inputs.len() > 1 { - if args.parallel_phase2 { - run_tree_from_multiple_tals_parallel_phase2_audit( - Arc::clone(&store), - &policy, - args.tal_inputs.clone(), - &http, - &rsync, - validation_time, - &config, - args.parallel_phase1_config - .clone() - .expect("phase1 config present"), - args.parallel_phase2_config - .clone() - .expect("phase2 config present"), - collect_current_repo_objects, - ) - .map_err(|e| e.to_string())? - } else { - run_tree_from_multiple_tals_parallel_phase1_audit( - Arc::clone(&store), - &policy, - args.tal_inputs.clone(), - &http, - &rsync, - validation_time, - &config, - args.parallel_phase1_config - .clone() - .expect("phase1 config present"), - collect_current_repo_objects, - ) - .map_err(|e| e.to_string())? - } - } else { - match ( - args.tal_url.as_ref(), - args.tal_path.as_ref(), - args.ta_path.as_ref(), - ) { - (Some(url), _, _) => { - if args.parallel_phase1 { - if args.parallel_phase2 { - run_tree_from_tal_url_parallel_phase2_audit( - Arc::clone(&store), - &policy, - url, - &http, - &rsync, - validation_time, - &config, - args.parallel_phase1_config - .clone() - .expect("phase1 config present"), - args.parallel_phase2_config - .clone() - .expect("phase2 config present"), - collect_current_repo_objects, - ) - .map_err(|e| e.to_string())? - } else { - run_tree_from_tal_url_parallel_phase1_audit( - Arc::clone(&store), - &policy, - url, - &http, - &rsync, - validation_time, - &config, - args.parallel_phase1_config - .clone() - .expect("phase1 config present"), - collect_current_repo_objects, - ) - .map_err(|e| e.to_string())? - } - } else if let Some((_, t)) = timing.as_ref() { - run_tree_from_tal_url_serial_audit_with_timing( - store.as_ref(), - &policy, - url, - &http, - &rsync, - validation_time, - &config, - t, - ) - .map_err(|e| e.to_string())? - } else { - run_tree_from_tal_url_serial_audit( - store.as_ref(), - &policy, - url, - &http, - &rsync, - validation_time, - &config, - ) - .map_err(|e| e.to_string())? - } - } - (None, Some(tal_path), Some(ta_path)) => { - let tal_bytes = std::fs::read(tal_path) - .map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?; - let ta_der = std::fs::read(ta_path) - .map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?; - if args.parallel_phase1 { - if args.parallel_phase2 { - run_tree_from_tal_and_ta_der_parallel_phase2_audit( - Arc::clone(&store), - &policy, - &tal_bytes, - &ta_der, - None, - &http, - &rsync, - validation_time, - &config, - args.parallel_phase1_config - .clone() - .expect("phase1 config present"), - args.parallel_phase2_config - .clone() - .expect("phase2 config present"), - collect_current_repo_objects, - ) - .map_err(|e| e.to_string())? - } else { - run_tree_from_tal_and_ta_der_parallel_phase1_audit( - Arc::clone(&store), - &policy, - &tal_bytes, - &ta_der, - None, - &http, - &rsync, - validation_time, - &config, - args.parallel_phase1_config - .clone() - .expect("phase1 config present"), - collect_current_repo_objects, - ) - .map_err(|e| e.to_string())? - } - } else if let Some((_, t)) = timing.as_ref() { - run_tree_from_tal_and_ta_der_serial_audit_with_timing( - store.as_ref(), - &policy, - &tal_bytes, - &ta_der, - None, - &http, - &rsync, - validation_time, - &config, - t, - ) - .map_err(|e| e.to_string())? - } else { - run_tree_from_tal_and_ta_der_serial_audit( - store.as_ref(), - &policy, - &tal_bytes, - &ta_der, - None, - &http, - &rsync, - validation_time, - &config, - ) - .map_err(|e| e.to_string())? - } - } - (None, Some(tal_path), None) => { - let tal_bytes = std::fs::read(tal_path) - .map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?; - let tal_uri = args.cir_tal_uri.clone(); - if let Some((_, t)) = timing.as_ref() { - crate::validation::run_tree_from_tal::run_tree_from_tal_bytes_serial_audit_with_timing( - store.as_ref(), - &policy, - &tal_bytes, - tal_uri, - &http, - &rsync, - validation_time, - &config, - t, - ) - .map_err(|e| e.to_string())? - } else { - crate::validation::run_tree_from_tal::run_tree_from_tal_bytes_serial_audit( - store.as_ref(), - &policy, - &tal_bytes, - tal_uri, - &http, - &rsync, - validation_time, - &config, - ) - .map_err(|e| e.to_string())? - } - } - _ => unreachable!("validated by parse_args"), - } - } + run_online_validation_with_fetchers( + Arc::clone(&store), + &policy, + &args, + &http, + &rsync, + validation_time, + &config, + collect_current_repo_objects, + timing.as_ref().map(|(_, t)| t), + )? }; let validation_ms = validation_started.elapsed().as_millis() as u64; @@ -1935,7 +1591,9 @@ mod tests { assert!(err.contains("Usage:"), "{err}"); assert!(err.contains("--db"), "{err}"); assert!(err.contains("--rsync-mirror-root"), "{err}"); - assert!(err.contains("--parallel-phase1"), "{err}"); + assert!(err.contains("--parallel-phase2-object-workers"), "{err}"); + assert!(!err.contains("--parallel-phase1"), "{err}"); + assert!(!err.contains("--parallel-phase2 "), "{err}"); } #[test] @@ -2162,7 +1820,26 @@ mod tests { } #[test] - fn parse_accepts_parallel_phase2_with_config() { + fn parse_accepts_default_parallel_config_and_phase2_overrides() { + let argv = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-url".to_string(), + "https://example.test/root.tal".to_string(), + "--parallel-phase2-object-workers".to_string(), + "3".to_string(), + "--parallel-phase2-worker-queue-capacity".to_string(), + "17".to_string(), + ]; + let args = parse_args(&argv).expect("parse args"); + assert_eq!(args.parallel_phase2_config.object_workers, 3); + assert_eq!(args.parallel_phase2_config.worker_queue_capacity, 17); + assert_eq!(args.parallel_phase1_config, ParallelPhase1Config::default()); + } + + #[test] + fn parse_rejects_removed_parallel_enable_flags() { let argv = vec![ "rpki".to_string(), "--db".to_string(), @@ -2170,22 +1847,10 @@ mod tests { "--tal-url".to_string(), "https://example.test/root.tal".to_string(), "--parallel-phase1".to_string(), - "--parallel-phase2".to_string(), - "--parallel-phase2-object-workers".to_string(), - "3".to_string(), - "--parallel-phase2-worker-queue-capacity".to_string(), - "17".to_string(), ]; - let args = parse_args(&argv).expect("parse args"); - assert!(args.parallel_phase1); - assert!(args.parallel_phase2); - let cfg = args.parallel_phase2_config.expect("phase2 config"); - assert_eq!(cfg.object_workers, 3); - assert_eq!(cfg.worker_queue_capacity, 17); - } + let err = parse_args(&argv).expect_err("removed phase flag should fail"); + assert!(err.contains("unknown argument: --parallel-phase1"), "{err}"); - #[test] - fn parse_rejects_parallel_phase2_without_phase1() { let argv = vec![ "rpki".to_string(), "--db".to_string(), @@ -2194,8 +1859,8 @@ mod tests { "https://example.test/root.tal".to_string(), "--parallel-phase2".to_string(), ]; - let err = parse_args(&argv).expect_err("phase2 without phase1 should fail"); - assert!(err.contains("requires --parallel-phase1"), "{err}"); + let err = parse_args(&argv).expect_err("removed phase flag should fail"); + assert!(err.contains("unknown argument: --parallel-phase2"), "{err}"); } #[test] @@ -2212,7 +1877,6 @@ mod tests { "arin.tal".to_string(), "--ta-path".to_string(), "arin.cer".to_string(), - "--parallel-phase1".to_string(), "--rsync-local-dir".to_string(), "repo".to_string(), "--cir-enable".to_string(), @@ -2361,11 +2025,12 @@ mod tests { assert!(args.ta_path.is_none()); assert_eq!(args.tal_inputs.len(), 1); assert_eq!(args.tal_inputs[0].tal_id, "x"); - assert!(!args.parallel_phase1); + assert_eq!(args.parallel_phase1_config, ParallelPhase1Config::default()); + assert_eq!(args.parallel_phase2_config, ParallelPhase2Config::default()); } #[test] - fn parse_accepts_parallel_phase1_with_multiple_tal_urls() { + fn parse_accepts_multi_tal_without_parallel_flags() { let argv = vec![ "rpki".to_string(), "--db".to_string(), @@ -2376,27 +2041,20 @@ mod tests { "https://example.test/apnic.tal".to_string(), "--tal-url".to_string(), "https://example.test/ripe.tal".to_string(), - "--parallel-phase1".to_string(), "--parallel-max-repo-sync-workers-global".to_string(), "8".to_string(), ]; let args = parse_args(&argv).expect("parse"); - assert!(args.parallel_phase1); assert_eq!(args.tal_urls.len(), 3); assert_eq!(args.tal_inputs.len(), 3); assert_eq!(args.tal_inputs[0].tal_id, "arin"); assert_eq!(args.tal_inputs[1].tal_id, "apnic"); assert_eq!(args.tal_inputs[2].tal_id, "ripe"); - assert_eq!( - args.parallel_phase1_config - .as_ref() - .map(|cfg| cfg.max_repo_sync_workers_global), - Some(8) - ); + assert_eq!(args.parallel_phase1_config.max_repo_sync_workers_global, 8); } #[test] - fn parse_rejects_multi_tal_without_parallel_phase1() { + fn parse_accepts_multi_tal_urls_by_default() { let argv = vec![ "rpki".to_string(), "--db".to_string(), @@ -2406,8 +2064,9 @@ mod tests { "--tal-url".to_string(), "https://example.test/apnic.tal".to_string(), ]; - let err = parse_args(&argv).unwrap_err(); - assert!(err.contains("requires --parallel-phase1"), "{err}"); + let args = parse_args(&argv).expect("parse"); + assert_eq!(args.tal_urls.len(), 2); + assert_eq!(args.tal_inputs.len(), 2); } #[test] @@ -2432,7 +2091,7 @@ mod tests { } #[test] - fn parse_accepts_parallel_phase1_with_multiple_tal_path_pairs() { + fn parse_accepts_multiple_tal_path_pairs_by_default() { let argv = vec![ "rpki".to_string(), "--db".to_string(), @@ -2445,7 +2104,6 @@ mod tests { "arin.tal".to_string(), "--ta-path".to_string(), "arin-ta.cer".to_string(), - "--parallel-phase1".to_string(), ]; let args = parse_args(&argv).expect("parse"); assert_eq!(args.tal_paths.len(), 2); @@ -2465,7 +2123,6 @@ mod tests { "apnic.tal".to_string(), "--ta-path".to_string(), "apnic-ta.cer".to_string(), - "--parallel-phase1".to_string(), ]; let err = parse_args(&argv).unwrap_err(); assert!(err.contains("must specify either one-or-more --tal-url or one-or-more --tal-path/--ta-path pairs"), "{err}"); @@ -2483,7 +2140,6 @@ mod tests { "arin.tal".to_string(), "--ta-path".to_string(), "apnic-ta.cer".to_string(), - "--parallel-phase1".to_string(), ]; let err = parse_args(&argv).unwrap_err(); assert!( diff --git a/src/parallel/config.rs b/src/parallel/config.rs index 71b10f2..138909a 100644 --- a/src/parallel/config.rs +++ b/src/parallel/config.rs @@ -14,9 +14,7 @@ pub struct ParallelPhase2Config { impl Default for ParallelPhase2Config { fn default() -> Self { Self { - object_workers: std::thread::available_parallelism() - .map(|n| n.get().max(1)) - .unwrap_or(4), + object_workers: 8, worker_queue_capacity: 256, } } diff --git a/tests/test_multi_tal_parallel_m2.rs b/tests/test_multi_tal_parallel_m2.rs index cdc7753..bbc49cf 100644 --- a/tests/test_multi_tal_parallel_m2.rs +++ b/tests/test_multi_tal_parallel_m2.rs @@ -25,7 +25,6 @@ fn run_case( "rpki".to_string(), "--db".to_string(), db_dir.path().to_string_lossy().to_string(), - "--parallel-phase1".to_string(), "--disable-rrdp".to_string(), "--rsync-local-dir".to_string(), fixture("tests/fixtures/repository") diff --git a/tests/test_parallel_phase1_transport_offline_r5.rs b/tests/test_parallel_phase1_transport_offline_r5.rs index ee12b00..ce4f5df 100644 --- a/tests/test_parallel_phase1_transport_offline_r5.rs +++ b/tests/test_parallel_phase1_transport_offline_r5.rs @@ -4,7 +4,7 @@ fn fixture_path(rel: &str) -> PathBuf { PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(rel) } -fn run_offline_case(parallel_phase1: bool, parallel_phase2: bool) -> (serde_json::Value, Vec) { +fn run_offline_case(phase2_workers: Option) -> (serde_json::Value, Vec) { let db_dir = tempfile::tempdir().expect("db tempdir"); let out_dir = tempfile::tempdir().expect("out tempdir"); let report_path = out_dir.path().join("report.json"); @@ -38,13 +38,9 @@ fn run_offline_case(parallel_phase1: bool, parallel_phase2: bool) -> (serde_json "--ccr-out".to_string(), ccr_path.to_string_lossy().to_string(), ]; - if parallel_phase1 { - argv.push("--parallel-phase1".to_string()); - } - if parallel_phase2 { - argv.push("--parallel-phase2".to_string()); + if let Some(workers) = phase2_workers { argv.push("--parallel-phase2-object-workers".to_string()); - argv.push("4".to_string()); + argv.push(workers.to_string()); argv.push("--parallel-phase2-worker-queue-capacity".to_string()); argv.push("64".to_string()); } @@ -58,61 +54,56 @@ fn run_offline_case(parallel_phase1: bool, parallel_phase2: bool) -> (serde_json } #[test] -fn offline_serial_and_parallel_phase1_match_compare_views() { - let (serial_report, serial_ccr_bytes) = run_offline_case(false, false); - let (parallel_report, parallel_ccr_bytes) = run_offline_case(true, false); +fn offline_default_parallel_and_configured_phase2_match_compare_views() { + let (default_report, default_ccr_bytes) = run_offline_case(None); + let (configured_report, configured_ccr_bytes) = run_offline_case(Some(4)); - let serial_ccr = rpki::ccr::decode_content_info(&serial_ccr_bytes).expect("decode serial ccr"); - let parallel_ccr = - rpki::ccr::decode_content_info(¶llel_ccr_bytes).expect("decode parallel ccr"); + let default_ccr = + rpki::ccr::decode_content_info(&default_ccr_bytes).expect("decode default ccr"); + let configured_ccr = + rpki::ccr::decode_content_info(&configured_ccr_bytes).expect("decode configured ccr"); - let (serial_vrps, serial_vaps) = - rpki::bundle::decode_ccr_compare_views(&serial_ccr, "apnic").expect("serial compare view"); - let (parallel_vrps, parallel_vaps) = - rpki::bundle::decode_ccr_compare_views(¶llel_ccr, "apnic") - .expect("parallel compare view"); + let (default_vrps, default_vaps) = + rpki::bundle::decode_ccr_compare_views(&default_ccr, "apnic") + .expect("default compare view"); + let (configured_vrps, configured_vaps) = + rpki::bundle::decode_ccr_compare_views(&configured_ccr, "apnic") + .expect("configured compare view"); - assert_eq!(serial_vrps, parallel_vrps, "VRP compare views must match"); - assert_eq!(serial_vaps, parallel_vaps, "VAP compare views must match"); - - let serial_points = serial_report["publication_points"] - .as_array() - .expect("serial publication_points"); - let parallel_points = parallel_report["publication_points"] - .as_array() - .expect("parallel publication_points"); assert_eq!( - serial_points.len(), - parallel_points.len(), + default_vrps, configured_vrps, + "VRP compare views must match" + ); + assert_eq!( + default_vaps, configured_vaps, + "VAP compare views must match" + ); + + let default_points = default_report["publication_points"] + .as_array() + .expect("default publication_points"); + let configured_points = configured_report["publication_points"] + .as_array() + .expect("configured publication_points"); + assert_eq!( + default_points.len(), + configured_points.len(), "publication point counts must match" ); } #[test] -fn offline_serial_and_parallel_phase2_match_compare_views() { - let (serial_report, serial_ccr_bytes) = run_offline_case(false, false); - let (phase2_report, phase2_ccr_bytes) = run_offline_case(true, true); - - let serial_ccr = rpki::ccr::decode_content_info(&serial_ccr_bytes).expect("decode serial ccr"); - let phase2_ccr = rpki::ccr::decode_content_info(&phase2_ccr_bytes).expect("decode phase2 ccr"); - - let (serial_vrps, serial_vaps) = - rpki::bundle::decode_ccr_compare_views(&serial_ccr, "apnic").expect("serial compare view"); - let (phase2_vrps, phase2_vaps) = - rpki::bundle::decode_ccr_compare_views(&phase2_ccr, "apnic").expect("phase2 compare view"); - - assert_eq!(serial_vrps, phase2_vrps, "VRP compare views must match"); - assert_eq!(serial_vaps, phase2_vaps, "VAP compare views must match"); - - let serial_points = serial_report["publication_points"] - .as_array() - .expect("serial publication_points"); - let phase2_points = phase2_report["publication_points"] - .as_array() - .expect("phase2 publication_points"); - assert_eq!( - serial_points.len(), - phase2_points.len(), - "publication point counts must match" +fn offline_default_parallel_emits_online_ccr_accumulator_output() { + let (report, ccr_bytes) = run_offline_case(None); + let ccr = rpki::ccr::decode_content_info(&ccr_bytes).expect("decode ccr"); + let (_vrps, _vaps) = + rpki::bundle::decode_ccr_compare_views(&ccr, "apnic").expect("compare view"); + assert!( + report["publication_points"] + .as_array() + .expect("publication_points") + .len() + > 0, + "default parallel replay must process publication points" ); }