20260427_2 移除parallel phase开关,默认全量并行

This commit is contained in:
yuyr 2026-04-27 17:13:32 +08:00
parent eaa375c5ec
commit 87275b5c57
6 changed files with 223 additions and 581 deletions

View File

@ -224,7 +224,6 @@ PY
--raw-store-db state/ours/raw-store.db \ --raw-store-db state/ours/raw-store.db \
--repo-bytes-db state/ours/repo-bytes.db \ --repo-bytes-db state/ours/repo-bytes.db \
"${OURS_TAL_ARGS[@]}" \ "${OURS_TAL_ARGS[@]}" \
--parallel-phase1 \
"${OURS_EXTRA_ARGV[@]}" \ "${OURS_EXTRA_ARGV[@]}" \
--ccr-out "steps/$STEP_ID/ours/result.ccr" \ --ccr-out "steps/$STEP_ID/ours/result.ccr" \
--report-json "steps/$STEP_ID/ours/report.json" \ --report-json "steps/$STEP_ID/ours/report.json" \

View File

@ -102,7 +102,6 @@ env RPKI_PROGRESS_LOG=1 RPKI_PROGRESS_SLOW_SECS=0 target/release/rpki \
--repo-bytes-db "$REMOTE_REPO_BYTES" \ --repo-bytes-db "$REMOTE_REPO_BYTES" \
--tal-path tests/fixtures/tal/apnic-rfc7730-https.tal \ --tal-path tests/fixtures/tal/apnic-rfc7730-https.tal \
--ta-path tests/fixtures/ta/apnic-ta.cer \ --ta-path tests/fixtures/ta/apnic-ta.cer \
--parallel-phase1 \
--ccr-out "$ccr_out" \ --ccr-out "$ccr_out" \
--report-json "$report_out" \ --report-json "$report_out" \
--cir-enable \ --cir-enable \

View File

@ -18,18 +18,13 @@ use crate::parallel::types::TalInputSpec;
use crate::policy::Policy; use crate::policy::Policy;
use crate::storage::RocksStore; use crate::storage::RocksStore;
use crate::validation::run_tree_from_tal::{ use crate::validation::run_tree_from_tal::{
RunTreeFromTalAuditOutput, run_tree_from_multiple_tals_parallel_phase1_audit, RunTreeFromTalAuditOutput, run_tree_from_multiple_tals_parallel_phase2_audit,
run_tree_from_multiple_tals_parallel_phase2_audit,
run_tree_from_tal_and_ta_der_parallel_phase1_audit,
run_tree_from_tal_and_ta_der_parallel_phase2_audit, run_tree_from_tal_and_ta_der_parallel_phase2_audit,
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit, run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit,
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit_with_timing, run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit_with_timing,
run_tree_from_tal_and_ta_der_payload_replay_serial_audit, run_tree_from_tal_and_ta_der_payload_replay_serial_audit,
run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing, run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing,
run_tree_from_tal_and_ta_der_serial_audit, run_tree_from_tal_url_parallel_phase2_audit,
run_tree_from_tal_and_ta_der_serial_audit_with_timing,
run_tree_from_tal_url_parallel_phase1_audit, run_tree_from_tal_url_parallel_phase2_audit,
run_tree_from_tal_url_serial_audit, run_tree_from_tal_url_serial_audit_with_timing,
}; };
use crate::validation::tree::TreeRunConfig; use crate::validation::tree::TreeRunConfig;
use serde::Serialize; use serde::Serialize;
@ -64,11 +59,8 @@ pub struct CliArgs {
pub tal_url: Option<String>, pub tal_url: Option<String>,
pub tal_path: Option<PathBuf>, pub tal_path: Option<PathBuf>,
pub ta_path: Option<PathBuf>, pub ta_path: Option<PathBuf>,
pub parallel_phase1: bool, pub parallel_phase1_config: ParallelPhase1Config,
pub parallel_phase2: bool, pub parallel_phase2_config: ParallelPhase2Config,
pub parallel_tal_urls: Vec<String>,
pub parallel_phase1_config: Option<ParallelPhase1Config>,
pub parallel_phase2_config: Option<ParallelPhase2Config>,
pub tal_inputs: Vec<TalInputSpec>, pub tal_inputs: Vec<TalInputSpec>,
pub db_path: PathBuf, pub db_path: PathBuf,
@ -138,18 +130,16 @@ Options:
--tal-url <url> TAL URL (repeatable; URL mode) --tal-url <url> TAL URL (repeatable; URL mode)
--tal-path <path> TAL file path (repeatable; file mode) --tal-path <path> TAL file path (repeatable; file mode)
--ta-path <path> TA certificate DER file path (repeatable in file mode; pairs with --tal-path by position) --ta-path <path> TA certificate DER file path (repeatable in file mode; pairs with --tal-path by position)
--parallel-phase1 Enable Phase 1 parallel scheduler skeleton
--parallel-phase2 Enable Phase 2 ROA object validation worker pool (requires --parallel-phase1)
--parallel-max-repo-sync-workers-global <n> --parallel-max-repo-sync-workers-global <n>
Phase 1 global repo sync worker budget Phase 1 global repo sync worker budget (default: 4)
--parallel-max-inflight-snapshot-bytes-global <n> --parallel-max-inflight-snapshot-bytes-global <n>
Phase 1 inflight snapshot byte budget Phase 1 inflight snapshot byte budget (default: 512MiB)
--parallel-max-pending-repo-results <n> --parallel-max-pending-repo-results <n>
Phase 1 pending repo result budget Phase 1 pending repo result budget (default: 1024)
--parallel-phase2-object-workers <n> --parallel-phase2-object-workers <n>
Phase 2 object worker count Phase 2 object worker count (default: 8)
--parallel-phase2-worker-queue-capacity <n> --parallel-phase2-worker-queue-capacity <n>
Phase 2 per-worker object queue capacity Phase 2 per-worker object queue capacity (default: 256)
--rsync-local-dir <path> Use LocalDirRsyncFetcher rooted at this directory (offline tests) --rsync-local-dir <path> Use LocalDirRsyncFetcher rooted at this directory (offline tests)
--disable-rrdp Disable RRDP and synchronize only via rsync --disable-rrdp Disable RRDP and synchronize only via rsync
@ -172,12 +162,8 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
let mut tal_urls: Vec<String> = Vec::new(); let mut tal_urls: Vec<String> = Vec::new();
let mut tal_paths: Vec<PathBuf> = Vec::new(); let mut tal_paths: Vec<PathBuf> = Vec::new();
let mut ta_paths: Vec<PathBuf> = Vec::new(); let mut ta_paths: Vec<PathBuf> = Vec::new();
let mut parallel_phase1: bool = false;
let mut parallel_phase2: bool = false;
let mut parallel_phase1_cfg = ParallelPhase1Config::default(); let mut parallel_phase1_cfg = ParallelPhase1Config::default();
let mut parallel_phase1_cfg_overridden: bool = false;
let mut parallel_phase2_cfg = ParallelPhase2Config::default(); let mut parallel_phase2_cfg = ParallelPhase2Config::default();
let mut parallel_phase2_cfg_overridden: bool = false;
let mut db_path: Option<PathBuf> = None; let mut db_path: Option<PathBuf> = None;
let mut raw_store_db: Option<PathBuf> = None; let mut raw_store_db: Option<PathBuf> = None;
@ -231,12 +217,6 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
let v = argv.get(i).ok_or("--ta-path requires a value")?; let v = argv.get(i).ok_or("--ta-path requires a value")?;
ta_paths.push(PathBuf::from(v)); ta_paths.push(PathBuf::from(v));
} }
"--parallel-phase1" => {
parallel_phase1 = true;
}
"--parallel-phase2" => {
parallel_phase2 = true;
}
"--parallel-max-repo-sync-workers-global" => { "--parallel-max-repo-sync-workers-global" => {
i += 1; i += 1;
let v = argv let v = argv
@ -245,7 +225,6 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
parallel_phase1_cfg.max_repo_sync_workers_global = v parallel_phase1_cfg.max_repo_sync_workers_global = v
.parse::<usize>() .parse::<usize>()
.map_err(|_| format!("invalid --parallel-max-repo-sync-workers-global: {v}"))?; .map_err(|_| format!("invalid --parallel-max-repo-sync-workers-global: {v}"))?;
parallel_phase1_cfg_overridden = true;
} }
"--parallel-max-inflight-snapshot-bytes-global" => { "--parallel-max-inflight-snapshot-bytes-global" => {
i += 1; i += 1;
@ -256,7 +235,6 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
v.parse::<usize>().map_err(|_| { v.parse::<usize>().map_err(|_| {
format!("invalid --parallel-max-inflight-snapshot-bytes-global: {v}") format!("invalid --parallel-max-inflight-snapshot-bytes-global: {v}")
})?; })?;
parallel_phase1_cfg_overridden = true;
} }
"--parallel-max-pending-repo-results" => { "--parallel-max-pending-repo-results" => {
i += 1; i += 1;
@ -266,7 +244,6 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
parallel_phase1_cfg.max_pending_repo_results = v parallel_phase1_cfg.max_pending_repo_results = v
.parse::<usize>() .parse::<usize>()
.map_err(|_| format!("invalid --parallel-max-pending-repo-results: {v}"))?; .map_err(|_| format!("invalid --parallel-max-pending-repo-results: {v}"))?;
parallel_phase1_cfg_overridden = true;
} }
"--parallel-phase2-object-workers" => { "--parallel-phase2-object-workers" => {
i += 1; i += 1;
@ -276,7 +253,6 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
parallel_phase2_cfg.object_workers = v parallel_phase2_cfg.object_workers = v
.parse::<usize>() .parse::<usize>()
.map_err(|_| format!("invalid --parallel-phase2-object-workers: {v}"))?; .map_err(|_| format!("invalid --parallel-phase2-object-workers: {v}"))?;
parallel_phase2_cfg_overridden = true;
} }
"--parallel-phase2-worker-queue-capacity" => { "--parallel-phase2-worker-queue-capacity" => {
i += 1; i += 1;
@ -286,7 +262,6 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
parallel_phase2_cfg.worker_queue_capacity = v parallel_phase2_cfg.worker_queue_capacity = v
.parse::<usize>() .parse::<usize>()
.map_err(|_| format!("invalid --parallel-phase2-worker-queue-capacity: {v}"))?; .map_err(|_| format!("invalid --parallel-phase2-worker-queue-capacity: {v}"))?;
parallel_phase2_cfg_overridden = true;
} }
"--db" => { "--db" => {
i += 1; i += 1;
@ -467,31 +442,13 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
usage() usage()
)); ));
} }
if !parallel_phase1 && parallel_phase1_cfg_overridden { if parallel_phase2_cfg.object_workers == 0 {
return Err(format!(
"--parallel-max-* options require --parallel-phase1\n\n{}",
usage()
));
}
if !parallel_phase2 && parallel_phase2_cfg_overridden {
return Err(format!(
"--parallel-phase2-* options require --parallel-phase2\n\n{}",
usage()
));
}
if parallel_phase2 && !parallel_phase1 {
return Err(format!(
"--parallel-phase2 requires --parallel-phase1\n\n{}",
usage()
));
}
if parallel_phase2 && parallel_phase2_cfg.object_workers == 0 {
return Err(format!( return Err(format!(
"--parallel-phase2-object-workers must be > 0\n\n{}", "--parallel-phase2-object-workers must be > 0\n\n{}",
usage() usage()
)); ));
} }
if parallel_phase2 && parallel_phase2_cfg.worker_queue_capacity == 0 { if parallel_phase2_cfg.worker_queue_capacity == 0 {
return Err(format!( return Err(format!(
"--parallel-phase2-worker-queue-capacity must be > 0\n\n{}", "--parallel-phase2-worker-queue-capacity must be > 0\n\n{}",
usage() usage()
@ -504,8 +461,7 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
)); ));
} }
if !tal_paths.is_empty() { if !tal_paths.is_empty() {
let strict_pairing_required = let strict_pairing_required = tal_paths.len() > 1 || !ta_paths.is_empty();
parallel_phase1 || tal_paths.len() > 1 || !ta_paths.is_empty();
if strict_pairing_required { if strict_pairing_required {
if ta_paths.len() != tal_paths.len() { if ta_paths.len() != tal_paths.len() {
return Err(format!( return Err(format!(
@ -520,12 +476,6 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
)); ));
} }
} }
if !parallel_phase1 && (tal_urls.len() > 1 || tal_paths.len() > 1) {
return Err(format!(
"multi-TAL execution requires --parallel-phase1\n\n{}",
usage()
));
}
let tal_url = tal_urls.first().cloned(); let tal_url = tal_urls.first().cloned();
let tal_path = tal_paths.first().cloned(); let tal_path = tal_paths.first().cloned();
let ta_path = ta_paths.first().cloned(); let ta_path = ta_paths.first().cloned();
@ -674,11 +624,8 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
tal_url, tal_url,
tal_path, tal_path,
ta_path, ta_path,
parallel_phase1, parallel_phase1_config: parallel_phase1_cfg,
parallel_phase2, parallel_phase2_config: parallel_phase2_cfg,
parallel_tal_urls: Vec::new(),
parallel_phase1_config: parallel_phase1.then_some(parallel_phase1_cfg),
parallel_phase2_config: parallel_phase2.then_some(parallel_phase2_cfg),
tal_inputs, tal_inputs,
db_path, db_path,
raw_store_db, raw_store_db,
@ -1063,6 +1010,111 @@ fn build_repo_sync_stats(
stats stats
} }
fn run_online_validation_with_fetchers<H, R>(
store: Arc<RocksStore>,
policy: &Policy,
args: &CliArgs,
http: &H,
rsync: &R,
validation_time: time::OffsetDateTime,
config: &TreeRunConfig,
collect_current_repo_objects: bool,
timing: Option<&TimingHandle>,
) -> Result<RunTreeFromTalAuditOutput, String>
where
H: crate::sync::rrdp::Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{
if args.tal_inputs.len() > 1 {
return run_tree_from_multiple_tals_parallel_phase2_audit(
store,
policy,
args.tal_inputs.clone(),
http,
rsync,
validation_time,
config,
args.parallel_phase1_config.clone(),
args.parallel_phase2_config.clone(),
collect_current_repo_objects,
)
.map_err(|e| e.to_string());
}
match (
args.tal_url.as_ref(),
args.tal_path.as_ref(),
args.ta_path.as_ref(),
) {
(Some(url), _, _) => run_tree_from_tal_url_parallel_phase2_audit(
store,
policy,
url,
http,
rsync,
validation_time,
config,
args.parallel_phase1_config.clone(),
args.parallel_phase2_config.clone(),
collect_current_repo_objects,
)
.map_err(|e| e.to_string()),
(None, Some(tal_path), Some(ta_path)) => {
let tal_bytes = std::fs::read(tal_path)
.map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?;
let ta_der = std::fs::read(ta_path)
.map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?;
run_tree_from_tal_and_ta_der_parallel_phase2_audit(
store,
policy,
&tal_bytes,
&ta_der,
None,
http,
rsync,
validation_time,
config,
args.parallel_phase1_config.clone(),
args.parallel_phase2_config.clone(),
collect_current_repo_objects,
)
.map_err(|e| e.to_string())
}
(None, Some(tal_path), None) => {
let tal_bytes = std::fs::read(tal_path)
.map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?;
let tal_uri = args.cir_tal_uri.clone();
if let Some(t) = timing {
crate::validation::run_tree_from_tal::run_tree_from_tal_bytes_serial_audit_with_timing(
store.as_ref(),
policy,
&tal_bytes,
tal_uri,
http,
rsync,
validation_time,
config,
t,
)
.map_err(|e| e.to_string())
} else {
crate::validation::run_tree_from_tal::run_tree_from_tal_bytes_serial_audit(
store.as_ref(),
policy,
&tal_bytes,
tal_uri,
http,
rsync,
validation_time,
config,
)
.map_err(|e| e.to_string())
}
}
_ => unreachable!("validated by parse_args"),
}
}
pub fn run(argv: &[String]) -> Result<(), String> { pub fn run(argv: &[String]) -> Result<(), String> {
let args = parse_args(argv)?; let args = parse_args(argv)?;
@ -1276,215 +1328,17 @@ pub fn run(argv: &[String]) -> Result<(), String> {
}) })
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
let rsync = LocalDirRsyncFetcher::new(dir); let rsync = LocalDirRsyncFetcher::new(dir);
if args.parallel_phase1 && args.tal_inputs.len() > 1 { run_online_validation_with_fetchers(
if args.parallel_phase2 { Arc::clone(&store),
run_tree_from_multiple_tals_parallel_phase2_audit( &policy,
Arc::clone(&store), &args,
&policy, &http,
args.tal_inputs.clone(), &rsync,
&http, validation_time,
&rsync, &config,
validation_time, collect_current_repo_objects,
&config, timing.as_ref().map(|(_, t)| t),
args.parallel_phase1_config )?
.clone()
.expect("phase1 config present"),
args.parallel_phase2_config
.clone()
.expect("phase2 config present"),
collect_current_repo_objects,
)
.map_err(|e| e.to_string())?
} else {
run_tree_from_multiple_tals_parallel_phase1_audit(
Arc::clone(&store),
&policy,
args.tal_inputs.clone(),
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
collect_current_repo_objects,
)
.map_err(|e| e.to_string())?
}
} else {
match (
args.tal_url.as_ref(),
args.tal_path.as_ref(),
args.ta_path.as_ref(),
) {
(Some(url), _, _) => {
if args.parallel_phase1 {
if args.parallel_phase2 {
run_tree_from_tal_url_parallel_phase2_audit(
Arc::clone(&store),
&policy,
url,
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
args.parallel_phase2_config
.clone()
.expect("phase2 config present"),
collect_current_repo_objects,
)
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_url_parallel_phase1_audit(
Arc::clone(&store),
&policy,
url,
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
collect_current_repo_objects,
)
.map_err(|e| e.to_string())?
}
} else if let Some((_, t)) = timing.as_ref() {
run_tree_from_tal_url_serial_audit_with_timing(
store.as_ref(),
&policy,
url,
&http,
&rsync,
validation_time,
&config,
t,
)
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_url_serial_audit(
store.as_ref(),
&policy,
url,
&http,
&rsync,
validation_time,
&config,
)
.map_err(|e| e.to_string())?
}
}
(None, Some(tal_path), Some(ta_path)) => {
let tal_bytes = std::fs::read(tal_path)
.map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?;
let ta_der = std::fs::read(ta_path)
.map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?;
if args.parallel_phase1 {
if args.parallel_phase2 {
run_tree_from_tal_and_ta_der_parallel_phase2_audit(
Arc::clone(&store),
&policy,
&tal_bytes,
&ta_der,
None,
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
args.parallel_phase2_config
.clone()
.expect("phase2 config present"),
collect_current_repo_objects,
)
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_and_ta_der_parallel_phase1_audit(
Arc::clone(&store),
&policy,
&tal_bytes,
&ta_der,
None,
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
collect_current_repo_objects,
)
.map_err(|e| e.to_string())?
}
} else if let Some((_, t)) = timing.as_ref() {
run_tree_from_tal_and_ta_der_serial_audit_with_timing(
store.as_ref(),
&policy,
&tal_bytes,
&ta_der,
None,
&http,
&rsync,
validation_time,
&config,
t,
)
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_and_ta_der_serial_audit(
store.as_ref(),
&policy,
&tal_bytes,
&ta_der,
None,
&http,
&rsync,
validation_time,
&config,
)
.map_err(|e| e.to_string())?
}
}
(None, Some(tal_path), None) => {
let tal_bytes = std::fs::read(tal_path)
.map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?;
let tal_uri = args.cir_tal_uri.clone();
if let Some((_, t)) = timing.as_ref() {
crate::validation::run_tree_from_tal::run_tree_from_tal_bytes_serial_audit_with_timing(
store.as_ref(),
&policy,
&tal_bytes,
tal_uri,
&http,
&rsync,
validation_time,
&config,
t,
)
.map_err(|e| e.to_string())?
} else {
crate::validation::run_tree_from_tal::run_tree_from_tal_bytes_serial_audit(
store.as_ref(),
&policy,
&tal_bytes,
tal_uri,
&http,
&rsync,
validation_time,
&config,
)
.map_err(|e| e.to_string())?
}
}
_ => unreachable!("validated by parse_args"),
}
}
} else { } else {
let http = BlockingHttpFetcher::new(HttpFetcherConfig { let http = BlockingHttpFetcher::new(HttpFetcherConfig {
timeout: std::time::Duration::from_secs(args.http_timeout_secs.max(1)), timeout: std::time::Duration::from_secs(args.http_timeout_secs.max(1)),
@ -1500,215 +1354,17 @@ pub fn run(argv: &[String]) -> Result<(), String> {
mirror_root: args.rsync_mirror_root.clone(), mirror_root: args.rsync_mirror_root.clone(),
..SystemRsyncConfig::default() ..SystemRsyncConfig::default()
}); });
if args.parallel_phase1 && args.tal_inputs.len() > 1 { run_online_validation_with_fetchers(
if args.parallel_phase2 { Arc::clone(&store),
run_tree_from_multiple_tals_parallel_phase2_audit( &policy,
Arc::clone(&store), &args,
&policy, &http,
args.tal_inputs.clone(), &rsync,
&http, validation_time,
&rsync, &config,
validation_time, collect_current_repo_objects,
&config, timing.as_ref().map(|(_, t)| t),
args.parallel_phase1_config )?
.clone()
.expect("phase1 config present"),
args.parallel_phase2_config
.clone()
.expect("phase2 config present"),
collect_current_repo_objects,
)
.map_err(|e| e.to_string())?
} else {
run_tree_from_multiple_tals_parallel_phase1_audit(
Arc::clone(&store),
&policy,
args.tal_inputs.clone(),
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
collect_current_repo_objects,
)
.map_err(|e| e.to_string())?
}
} else {
match (
args.tal_url.as_ref(),
args.tal_path.as_ref(),
args.ta_path.as_ref(),
) {
(Some(url), _, _) => {
if args.parallel_phase1 {
if args.parallel_phase2 {
run_tree_from_tal_url_parallel_phase2_audit(
Arc::clone(&store),
&policy,
url,
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
args.parallel_phase2_config
.clone()
.expect("phase2 config present"),
collect_current_repo_objects,
)
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_url_parallel_phase1_audit(
Arc::clone(&store),
&policy,
url,
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
collect_current_repo_objects,
)
.map_err(|e| e.to_string())?
}
} else if let Some((_, t)) = timing.as_ref() {
run_tree_from_tal_url_serial_audit_with_timing(
store.as_ref(),
&policy,
url,
&http,
&rsync,
validation_time,
&config,
t,
)
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_url_serial_audit(
store.as_ref(),
&policy,
url,
&http,
&rsync,
validation_time,
&config,
)
.map_err(|e| e.to_string())?
}
}
(None, Some(tal_path), Some(ta_path)) => {
let tal_bytes = std::fs::read(tal_path)
.map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?;
let ta_der = std::fs::read(ta_path)
.map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?;
if args.parallel_phase1 {
if args.parallel_phase2 {
run_tree_from_tal_and_ta_der_parallel_phase2_audit(
Arc::clone(&store),
&policy,
&tal_bytes,
&ta_der,
None,
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
args.parallel_phase2_config
.clone()
.expect("phase2 config present"),
collect_current_repo_objects,
)
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_and_ta_der_parallel_phase1_audit(
Arc::clone(&store),
&policy,
&tal_bytes,
&ta_der,
None,
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
collect_current_repo_objects,
)
.map_err(|e| e.to_string())?
}
} else if let Some((_, t)) = timing.as_ref() {
run_tree_from_tal_and_ta_der_serial_audit_with_timing(
store.as_ref(),
&policy,
&tal_bytes,
&ta_der,
None,
&http,
&rsync,
validation_time,
&config,
t,
)
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_and_ta_der_serial_audit(
store.as_ref(),
&policy,
&tal_bytes,
&ta_der,
None,
&http,
&rsync,
validation_time,
&config,
)
.map_err(|e| e.to_string())?
}
}
(None, Some(tal_path), None) => {
let tal_bytes = std::fs::read(tal_path)
.map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?;
let tal_uri = args.cir_tal_uri.clone();
if let Some((_, t)) = timing.as_ref() {
crate::validation::run_tree_from_tal::run_tree_from_tal_bytes_serial_audit_with_timing(
store.as_ref(),
&policy,
&tal_bytes,
tal_uri,
&http,
&rsync,
validation_time,
&config,
t,
)
.map_err(|e| e.to_string())?
} else {
crate::validation::run_tree_from_tal::run_tree_from_tal_bytes_serial_audit(
store.as_ref(),
&policy,
&tal_bytes,
tal_uri,
&http,
&rsync,
validation_time,
&config,
)
.map_err(|e| e.to_string())?
}
}
_ => unreachable!("validated by parse_args"),
}
}
}; };
let validation_ms = validation_started.elapsed().as_millis() as u64; let validation_ms = validation_started.elapsed().as_millis() as u64;
@ -1935,7 +1591,9 @@ mod tests {
assert!(err.contains("Usage:"), "{err}"); assert!(err.contains("Usage:"), "{err}");
assert!(err.contains("--db"), "{err}"); assert!(err.contains("--db"), "{err}");
assert!(err.contains("--rsync-mirror-root"), "{err}"); assert!(err.contains("--rsync-mirror-root"), "{err}");
assert!(err.contains("--parallel-phase1"), "{err}"); assert!(err.contains("--parallel-phase2-object-workers"), "{err}");
assert!(!err.contains("--parallel-phase1"), "{err}");
assert!(!err.contains("--parallel-phase2 "), "{err}");
} }
#[test] #[test]
@ -2162,7 +1820,26 @@ mod tests {
} }
#[test] #[test]
fn parse_accepts_parallel_phase2_with_config() { fn parse_accepts_default_parallel_config_and_phase2_overrides() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/root.tal".to_string(),
"--parallel-phase2-object-workers".to_string(),
"3".to_string(),
"--parallel-phase2-worker-queue-capacity".to_string(),
"17".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert_eq!(args.parallel_phase2_config.object_workers, 3);
assert_eq!(args.parallel_phase2_config.worker_queue_capacity, 17);
assert_eq!(args.parallel_phase1_config, ParallelPhase1Config::default());
}
#[test]
fn parse_rejects_removed_parallel_enable_flags() {
let argv = vec![ let argv = vec![
"rpki".to_string(), "rpki".to_string(),
"--db".to_string(), "--db".to_string(),
@ -2170,22 +1847,10 @@ mod tests {
"--tal-url".to_string(), "--tal-url".to_string(),
"https://example.test/root.tal".to_string(), "https://example.test/root.tal".to_string(),
"--parallel-phase1".to_string(), "--parallel-phase1".to_string(),
"--parallel-phase2".to_string(),
"--parallel-phase2-object-workers".to_string(),
"3".to_string(),
"--parallel-phase2-worker-queue-capacity".to_string(),
"17".to_string(),
]; ];
let args = parse_args(&argv).expect("parse args"); let err = parse_args(&argv).expect_err("removed phase flag should fail");
assert!(args.parallel_phase1); assert!(err.contains("unknown argument: --parallel-phase1"), "{err}");
assert!(args.parallel_phase2);
let cfg = args.parallel_phase2_config.expect("phase2 config");
assert_eq!(cfg.object_workers, 3);
assert_eq!(cfg.worker_queue_capacity, 17);
}
#[test]
fn parse_rejects_parallel_phase2_without_phase1() {
let argv = vec![ let argv = vec![
"rpki".to_string(), "rpki".to_string(),
"--db".to_string(), "--db".to_string(),
@ -2194,8 +1859,8 @@ mod tests {
"https://example.test/root.tal".to_string(), "https://example.test/root.tal".to_string(),
"--parallel-phase2".to_string(), "--parallel-phase2".to_string(),
]; ];
let err = parse_args(&argv).expect_err("phase2 without phase1 should fail"); let err = parse_args(&argv).expect_err("removed phase flag should fail");
assert!(err.contains("requires --parallel-phase1"), "{err}"); assert!(err.contains("unknown argument: --parallel-phase2"), "{err}");
} }
#[test] #[test]
@ -2212,7 +1877,6 @@ mod tests {
"arin.tal".to_string(), "arin.tal".to_string(),
"--ta-path".to_string(), "--ta-path".to_string(),
"arin.cer".to_string(), "arin.cer".to_string(),
"--parallel-phase1".to_string(),
"--rsync-local-dir".to_string(), "--rsync-local-dir".to_string(),
"repo".to_string(), "repo".to_string(),
"--cir-enable".to_string(), "--cir-enable".to_string(),
@ -2361,11 +2025,12 @@ mod tests {
assert!(args.ta_path.is_none()); assert!(args.ta_path.is_none());
assert_eq!(args.tal_inputs.len(), 1); assert_eq!(args.tal_inputs.len(), 1);
assert_eq!(args.tal_inputs[0].tal_id, "x"); assert_eq!(args.tal_inputs[0].tal_id, "x");
assert!(!args.parallel_phase1); assert_eq!(args.parallel_phase1_config, ParallelPhase1Config::default());
assert_eq!(args.parallel_phase2_config, ParallelPhase2Config::default());
} }
#[test] #[test]
fn parse_accepts_parallel_phase1_with_multiple_tal_urls() { fn parse_accepts_multi_tal_without_parallel_flags() {
let argv = vec![ let argv = vec![
"rpki".to_string(), "rpki".to_string(),
"--db".to_string(), "--db".to_string(),
@ -2376,27 +2041,20 @@ mod tests {
"https://example.test/apnic.tal".to_string(), "https://example.test/apnic.tal".to_string(),
"--tal-url".to_string(), "--tal-url".to_string(),
"https://example.test/ripe.tal".to_string(), "https://example.test/ripe.tal".to_string(),
"--parallel-phase1".to_string(),
"--parallel-max-repo-sync-workers-global".to_string(), "--parallel-max-repo-sync-workers-global".to_string(),
"8".to_string(), "8".to_string(),
]; ];
let args = parse_args(&argv).expect("parse"); let args = parse_args(&argv).expect("parse");
assert!(args.parallel_phase1);
assert_eq!(args.tal_urls.len(), 3); assert_eq!(args.tal_urls.len(), 3);
assert_eq!(args.tal_inputs.len(), 3); assert_eq!(args.tal_inputs.len(), 3);
assert_eq!(args.tal_inputs[0].tal_id, "arin"); assert_eq!(args.tal_inputs[0].tal_id, "arin");
assert_eq!(args.tal_inputs[1].tal_id, "apnic"); assert_eq!(args.tal_inputs[1].tal_id, "apnic");
assert_eq!(args.tal_inputs[2].tal_id, "ripe"); assert_eq!(args.tal_inputs[2].tal_id, "ripe");
assert_eq!( assert_eq!(args.parallel_phase1_config.max_repo_sync_workers_global, 8);
args.parallel_phase1_config
.as_ref()
.map(|cfg| cfg.max_repo_sync_workers_global),
Some(8)
);
} }
#[test] #[test]
fn parse_rejects_multi_tal_without_parallel_phase1() { fn parse_accepts_multi_tal_urls_by_default() {
let argv = vec![ let argv = vec![
"rpki".to_string(), "rpki".to_string(),
"--db".to_string(), "--db".to_string(),
@ -2406,8 +2064,9 @@ mod tests {
"--tal-url".to_string(), "--tal-url".to_string(),
"https://example.test/apnic.tal".to_string(), "https://example.test/apnic.tal".to_string(),
]; ];
let err = parse_args(&argv).unwrap_err(); let args = parse_args(&argv).expect("parse");
assert!(err.contains("requires --parallel-phase1"), "{err}"); assert_eq!(args.tal_urls.len(), 2);
assert_eq!(args.tal_inputs.len(), 2);
} }
#[test] #[test]
@ -2432,7 +2091,7 @@ mod tests {
} }
#[test] #[test]
fn parse_accepts_parallel_phase1_with_multiple_tal_path_pairs() { fn parse_accepts_multiple_tal_path_pairs_by_default() {
let argv = vec![ let argv = vec![
"rpki".to_string(), "rpki".to_string(),
"--db".to_string(), "--db".to_string(),
@ -2445,7 +2104,6 @@ mod tests {
"arin.tal".to_string(), "arin.tal".to_string(),
"--ta-path".to_string(), "--ta-path".to_string(),
"arin-ta.cer".to_string(), "arin-ta.cer".to_string(),
"--parallel-phase1".to_string(),
]; ];
let args = parse_args(&argv).expect("parse"); let args = parse_args(&argv).expect("parse");
assert_eq!(args.tal_paths.len(), 2); assert_eq!(args.tal_paths.len(), 2);
@ -2465,7 +2123,6 @@ mod tests {
"apnic.tal".to_string(), "apnic.tal".to_string(),
"--ta-path".to_string(), "--ta-path".to_string(),
"apnic-ta.cer".to_string(), "apnic-ta.cer".to_string(),
"--parallel-phase1".to_string(),
]; ];
let err = parse_args(&argv).unwrap_err(); let err = parse_args(&argv).unwrap_err();
assert!(err.contains("must specify either one-or-more --tal-url or one-or-more --tal-path/--ta-path pairs"), "{err}"); assert!(err.contains("must specify either one-or-more --tal-url or one-or-more --tal-path/--ta-path pairs"), "{err}");
@ -2483,7 +2140,6 @@ mod tests {
"arin.tal".to_string(), "arin.tal".to_string(),
"--ta-path".to_string(), "--ta-path".to_string(),
"apnic-ta.cer".to_string(), "apnic-ta.cer".to_string(),
"--parallel-phase1".to_string(),
]; ];
let err = parse_args(&argv).unwrap_err(); let err = parse_args(&argv).unwrap_err();
assert!( assert!(

View File

@ -14,9 +14,7 @@ pub struct ParallelPhase2Config {
impl Default for ParallelPhase2Config { impl Default for ParallelPhase2Config {
fn default() -> Self { fn default() -> Self {
Self { Self {
object_workers: std::thread::available_parallelism() object_workers: 8,
.map(|n| n.get().max(1))
.unwrap_or(4),
worker_queue_capacity: 256, worker_queue_capacity: 256,
} }
} }

View File

@ -25,7 +25,6 @@ fn run_case(
"rpki".to_string(), "rpki".to_string(),
"--db".to_string(), "--db".to_string(),
db_dir.path().to_string_lossy().to_string(), db_dir.path().to_string_lossy().to_string(),
"--parallel-phase1".to_string(),
"--disable-rrdp".to_string(), "--disable-rrdp".to_string(),
"--rsync-local-dir".to_string(), "--rsync-local-dir".to_string(),
fixture("tests/fixtures/repository") fixture("tests/fixtures/repository")

View File

@ -4,7 +4,7 @@ fn fixture_path(rel: &str) -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(rel) PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(rel)
} }
fn run_offline_case(parallel_phase1: bool, parallel_phase2: bool) -> (serde_json::Value, Vec<u8>) { fn run_offline_case(phase2_workers: Option<usize>) -> (serde_json::Value, Vec<u8>) {
let db_dir = tempfile::tempdir().expect("db tempdir"); let db_dir = tempfile::tempdir().expect("db tempdir");
let out_dir = tempfile::tempdir().expect("out tempdir"); let out_dir = tempfile::tempdir().expect("out tempdir");
let report_path = out_dir.path().join("report.json"); let report_path = out_dir.path().join("report.json");
@ -38,13 +38,9 @@ fn run_offline_case(parallel_phase1: bool, parallel_phase2: bool) -> (serde_json
"--ccr-out".to_string(), "--ccr-out".to_string(),
ccr_path.to_string_lossy().to_string(), ccr_path.to_string_lossy().to_string(),
]; ];
if parallel_phase1 { if let Some(workers) = phase2_workers {
argv.push("--parallel-phase1".to_string());
}
if parallel_phase2 {
argv.push("--parallel-phase2".to_string());
argv.push("--parallel-phase2-object-workers".to_string()); argv.push("--parallel-phase2-object-workers".to_string());
argv.push("4".to_string()); argv.push(workers.to_string());
argv.push("--parallel-phase2-worker-queue-capacity".to_string()); argv.push("--parallel-phase2-worker-queue-capacity".to_string());
argv.push("64".to_string()); argv.push("64".to_string());
} }
@ -58,61 +54,56 @@ fn run_offline_case(parallel_phase1: bool, parallel_phase2: bool) -> (serde_json
} }
#[test] #[test]
fn offline_serial_and_parallel_phase1_match_compare_views() { fn offline_default_parallel_and_configured_phase2_match_compare_views() {
let (serial_report, serial_ccr_bytes) = run_offline_case(false, false); let (default_report, default_ccr_bytes) = run_offline_case(None);
let (parallel_report, parallel_ccr_bytes) = run_offline_case(true, false); let (configured_report, configured_ccr_bytes) = run_offline_case(Some(4));
let serial_ccr = rpki::ccr::decode_content_info(&serial_ccr_bytes).expect("decode serial ccr"); let default_ccr =
let parallel_ccr = rpki::ccr::decode_content_info(&default_ccr_bytes).expect("decode default ccr");
rpki::ccr::decode_content_info(&parallel_ccr_bytes).expect("decode parallel ccr"); let configured_ccr =
rpki::ccr::decode_content_info(&configured_ccr_bytes).expect("decode configured ccr");
let (serial_vrps, serial_vaps) = let (default_vrps, default_vaps) =
rpki::bundle::decode_ccr_compare_views(&serial_ccr, "apnic").expect("serial compare view"); rpki::bundle::decode_ccr_compare_views(&default_ccr, "apnic")
let (parallel_vrps, parallel_vaps) = .expect("default compare view");
rpki::bundle::decode_ccr_compare_views(&parallel_ccr, "apnic") let (configured_vrps, configured_vaps) =
.expect("parallel compare view"); rpki::bundle::decode_ccr_compare_views(&configured_ccr, "apnic")
.expect("configured compare view");
assert_eq!(serial_vrps, parallel_vrps, "VRP compare views must match");
assert_eq!(serial_vaps, parallel_vaps, "VAP compare views must match");
let serial_points = serial_report["publication_points"]
.as_array()
.expect("serial publication_points");
let parallel_points = parallel_report["publication_points"]
.as_array()
.expect("parallel publication_points");
assert_eq!( assert_eq!(
serial_points.len(), default_vrps, configured_vrps,
parallel_points.len(), "VRP compare views must match"
);
assert_eq!(
default_vaps, configured_vaps,
"VAP compare views must match"
);
let default_points = default_report["publication_points"]
.as_array()
.expect("default publication_points");
let configured_points = configured_report["publication_points"]
.as_array()
.expect("configured publication_points");
assert_eq!(
default_points.len(),
configured_points.len(),
"publication point counts must match" "publication point counts must match"
); );
} }
#[test] #[test]
fn offline_serial_and_parallel_phase2_match_compare_views() { fn offline_default_parallel_emits_online_ccr_accumulator_output() {
let (serial_report, serial_ccr_bytes) = run_offline_case(false, false); let (report, ccr_bytes) = run_offline_case(None);
let (phase2_report, phase2_ccr_bytes) = run_offline_case(true, true); let ccr = rpki::ccr::decode_content_info(&ccr_bytes).expect("decode ccr");
let (_vrps, _vaps) =
let serial_ccr = rpki::ccr::decode_content_info(&serial_ccr_bytes).expect("decode serial ccr"); rpki::bundle::decode_ccr_compare_views(&ccr, "apnic").expect("compare view");
let phase2_ccr = rpki::ccr::decode_content_info(&phase2_ccr_bytes).expect("decode phase2 ccr"); assert!(
report["publication_points"]
let (serial_vrps, serial_vaps) = .as_array()
rpki::bundle::decode_ccr_compare_views(&serial_ccr, "apnic").expect("serial compare view"); .expect("publication_points")
let (phase2_vrps, phase2_vaps) = .len()
rpki::bundle::decode_ccr_compare_views(&phase2_ccr, "apnic").expect("phase2 compare view"); > 0,
"default parallel replay must process publication points"
assert_eq!(serial_vrps, phase2_vrps, "VRP compare views must match");
assert_eq!(serial_vaps, phase2_vaps, "VAP compare views must match");
let serial_points = serial_report["publication_points"]
.as_array()
.expect("serial publication_points");
let phase2_points = phase2_report["publication_points"]
.as_array()
.expect("phase2 publication_points");
assert_eq!(
serial_points.len(),
phase2_points.len(),
"publication point counts must match"
); );
} }