From 38421b1ae71d579be9ce50ef5b52a138e03a7994 Mon Sep 17 00:00:00 2001 From: yuyr Date: Thu, 16 Apr 2026 11:33:52 +0800 Subject: [PATCH] =?UTF-8?q?20260415=5F2=20=E6=94=AF=E6=8C=81=E5=A4=9A?= =?UTF-8?q?=E4=B8=AARIR=E5=B9=B6=E8=A1=8C=E6=B7=B7=E5=90=88=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=EF=BC=8CAPNIC+ARIN=E8=BF=90=E8=A1=8C1=20snapshot=20+?= =?UTF-8?q?=201=20delta=EF=BC=8C=E5=AF=B9=E6=AF=94rpki-client=EF=BC=8Csnap?= =?UTF-8?q?shot=E6=AF=94rpki-client=E6=85=A24=E5=88=86=E9=92=9F(398vs137)?= =?UTF-8?q?=EF=BC=8C=E8=BE=93=E5=87=BA=E6=9C=AA=E6=94=B6=E6=95=9B=EF=BC=8C?= =?UTF-8?q?delta=E6=97=B6=E8=BE=93=E5=87=BA=E6=94=B6=E6=95=9B(348vs181)?= =?UTF-8?q?=EF=BC=8C=E8=AF=84=E4=BC=B0=E5=BA=94=E8=AF=A5=E6=98=AF=E6=AD=A3?= =?UTF-8?q?=E7=A1=AE=E6=80=A7=E6=B2=A1=E6=9C=89=E9=97=AE=E9=A2=98=EF=BC=8C?= =?UTF-8?q?=E4=B8=8B=E4=B8=80=E6=AD=A5=E8=BF=9B=E4=B8=80=E6=AD=A5=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cli.rs | 248 +++++++++++++++++++++------- src/parallel/types.rs | 40 ++++- src/validation/run_tree_from_tal.rs | 240 +++++++++++++++++++++++++-- src/validation/tree.rs | 44 ++++- tests/test_multi_tal_parallel_m2.rs | 104 ++++++++++++ 5 files changed, 594 insertions(+), 82 deletions(-) create mode 100644 tests/test_multi_tal_parallel_m2.rs diff --git a/src/cli.rs b/src/cli.rs index b8391a0..c6c2e29 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -19,6 +19,7 @@ use crate::validation::run_tree_from_tal::{ run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit_with_timing, run_tree_from_tal_and_ta_der_payload_replay_serial_audit, run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing, + run_tree_from_multiple_tals_parallel_phase1_audit, run_tree_from_tal_and_ta_der_parallel_phase1_audit, run_tree_from_tal_and_ta_der_serial_audit, run_tree_from_tal_and_ta_der_serial_audit_with_timing, run_tree_from_tal_url_serial_audit, @@ -51,6 +52,9 @@ struct RunStageTiming { #[derive(Clone, Debug, PartialEq, Eq)] pub struct CliArgs { + pub tal_urls: Vec, + pub tal_paths: Vec, + pub ta_paths: Vec, pub tal_url: Option, pub tal_path: Option, pub ta_path: Option, @@ -97,8 +101,8 @@ fn usage() -> String { format!( "\ Usage: - {bin} --db --tal-url [options] - {bin} --db --tal-path --ta-path [options] + {bin} --db --tal-url [--tal-url ...] [options] + {bin} --db --tal-path --ta-path [--tal-path --ta-path ...] [options] Options: --db RocksDB directory path (required) @@ -118,11 +122,10 @@ Options: --payload-delta-archive Use local delta payload archive root (offline delta replay) --payload-delta-locks Use local locks-delta.json (offline delta replay) - --tal-url TAL URL (downloads TAL + TA over HTTPS) - --tal-path TAL file path (offline-friendly; requires --ta-path) - --ta-path TA certificate DER file path (offline-friendly) + --tal-url TAL URL (repeatable; URL mode) + --tal-path TAL file path (repeatable; file mode) + --ta-path TA certificate DER file path (repeatable in file mode; pairs with --tal-path by position) --parallel-phase1 Enable Phase 1 parallel scheduler skeleton - --parallel-tal-url Additional TAL URL for future multi-TAL runs (repeatable) --parallel-max-repo-sync-workers-global Phase 1 global repo sync worker budget --parallel-max-inflight-snapshot-bytes-global @@ -148,11 +151,10 @@ Options: } pub fn parse_args(argv: &[String]) -> Result { - let mut tal_url: Option = None; - let mut tal_path: Option = None; - let mut ta_path: Option = None; + let mut tal_urls: Vec = Vec::new(); + let mut tal_paths: Vec = Vec::new(); + let mut ta_paths: Vec = Vec::new(); let mut parallel_phase1: bool = false; - let mut parallel_tal_urls: Vec = Vec::new(); let mut parallel_phase1_cfg = ParallelPhase1Config::default(); let mut parallel_phase1_cfg_overridden: bool = false; @@ -193,26 +195,21 @@ pub fn parse_args(argv: &[String]) -> Result { "--tal-url" => { i += 1; let v = argv.get(i).ok_or("--tal-url requires a value")?; - tal_url = Some(v.clone()); + tal_urls.push(v.clone()); } "--tal-path" => { i += 1; let v = argv.get(i).ok_or("--tal-path requires a value")?; - tal_path = Some(PathBuf::from(v)); + tal_paths.push(PathBuf::from(v)); } "--ta-path" => { i += 1; let v = argv.get(i).ok_or("--ta-path requires a value")?; - ta_path = Some(PathBuf::from(v)); + ta_paths.push(PathBuf::from(v)); } "--parallel-phase1" => { parallel_phase1 = true; } - "--parallel-tal-url" => { - i += 1; - let v = argv.get(i).ok_or("--parallel-tal-url requires a value")?; - parallel_tal_urls.push(v.clone()); - } "--parallel-max-repo-sync-workers-global" => { i += 1; let v = argv @@ -407,25 +404,50 @@ pub fn parse_args(argv: &[String]) -> Result { let db_path = db_path.ok_or_else(|| format!("--db is required\n\n{}", usage()))?; - let tal_mode_count = tal_url.is_some() as u8 + tal_path.is_some() as u8; + let tal_mode_count = (!tal_urls.is_empty()) as u8 + (!tal_paths.is_empty()) as u8; if tal_mode_count != 1 { return Err(format!( - "must specify exactly one of --tal-url or --tal-path\n\n{}", + "must specify either one-or-more --tal-url or one-or-more --tal-path/--ta-path pairs\n\n{}", usage() )); } - if !parallel_phase1 && (!parallel_tal_urls.is_empty() || parallel_phase1_cfg_overridden) { + if !parallel_phase1 && parallel_phase1_cfg_overridden { return Err(format!( - "--parallel-tal-url and --parallel-max-* options require --parallel-phase1\n\n{}", + "--parallel-max-* options require --parallel-phase1\n\n{}", usage() )); } - if tal_path.is_some() && ta_path.is_none() && !disable_rrdp { + if !tal_urls.is_empty() && !ta_paths.is_empty() { return Err(format!( - "--tal-path requires --ta-path unless --disable-rrdp is set\n\n{}", + "--ta-path cannot be used with --tal-url mode\n\n{}", usage() )); } + if !tal_paths.is_empty() { + let strict_pairing_required = parallel_phase1 || tal_paths.len() > 1 || !ta_paths.is_empty(); + if strict_pairing_required { + if ta_paths.len() != tal_paths.len() { + return Err(format!( + "--tal-path and --ta-path counts must match in file mode\n\n{}", + usage() + )); + } + } else if ta_paths.is_empty() && !disable_rrdp { + return Err(format!( + "--tal-path requires --ta-path unless --disable-rrdp is set\n\n{}", + usage() + )); + } + } + if !parallel_phase1 && (tal_urls.len() > 1 || tal_paths.len() > 1) { + return Err(format!( + "multi-TAL execution requires --parallel-phase1\n\n{}", + usage() + )); + } + let tal_url = tal_urls.first().cloned(); + let tal_path = tal_paths.first().cloned(); + let ta_path = ta_paths.first().cloned(); let cir_backend_count = cir_static_root.is_some() as u8 + raw_store_db.is_some() as u8; if cir_enabled && (cir_out_path.is_none() || cir_backend_count != 1) { return Err(format!( @@ -533,26 +555,31 @@ pub fn parse_args(argv: &[String]) -> Result { } let mut tal_inputs = Vec::new(); - if let Some(url) = tal_url.as_ref() { - tal_inputs.push(TalInputSpec::from_url(url.clone())); - } else if let Some(path) = tal_path.as_ref() { - tal_inputs.push(TalInputSpec::from_file_path(path.clone())); - } - if parallel_phase1 { - tal_inputs.extend( - parallel_tal_urls - .iter() - .cloned() - .map(TalInputSpec::from_url), - ); + if !tal_urls.is_empty() { + tal_inputs.extend(tal_urls.iter().cloned().map(TalInputSpec::from_url)); + } else if !tal_paths.is_empty() { + if ta_paths.len() == tal_paths.len() { + tal_inputs.extend( + tal_paths + .iter() + .cloned() + .zip(ta_paths.iter().cloned()) + .map(|(tal_path, ta_path)| TalInputSpec::from_file_path_with_ta(tal_path, ta_path)), + ); + } else { + tal_inputs.extend(tal_paths.iter().cloned().map(TalInputSpec::from_file_path)); + } } Ok(CliArgs { + tal_urls, + tal_paths, + ta_paths, tal_url, tal_path, ta_path, parallel_phase1, - parallel_tal_urls, + parallel_tal_urls: Vec::new(), parallel_phase1_config: parallel_phase1.then_some(parallel_phase1_cfg), tal_inputs, db_path, @@ -932,11 +959,26 @@ pub fn run(argv: &[String]) -> Result<(), String> { }) .map_err(|e| e.to_string())?; let rsync = LocalDirRsyncFetcher::new(dir); - match ( - args.tal_url.as_ref(), - args.tal_path.as_ref(), - args.ta_path.as_ref(), - ) { + if args.parallel_phase1 && args.tal_inputs.len() > 1 { + run_tree_from_multiple_tals_parallel_phase1_audit( + Arc::clone(&store), + &policy, + args.tal_inputs.clone(), + &http, + &rsync, + validation_time, + &config, + args.parallel_phase1_config + .clone() + .expect("phase1 config present"), + ) + .map_err(|e| e.to_string())? + } else { + match ( + args.tal_url.as_ref(), + args.tal_path.as_ref(), + args.ta_path.as_ref(), + ) { (Some(url), _, _) => { if args.parallel_phase1 { run_tree_from_tal_url_parallel_phase1_audit( @@ -1059,6 +1101,7 @@ pub fn run(argv: &[String]) -> Result<(), String> { } } _ => unreachable!("validated by parse_args"), + } } } else { let http = BlockingHttpFetcher::new(HttpFetcherConfig { @@ -1075,11 +1118,26 @@ pub fn run(argv: &[String]) -> Result<(), String> { mirror_root: args.rsync_mirror_root.clone(), ..SystemRsyncConfig::default() }); - match ( - args.tal_url.as_ref(), - args.tal_path.as_ref(), - args.ta_path.as_ref(), - ) { + if args.parallel_phase1 && args.tal_inputs.len() > 1 { + run_tree_from_multiple_tals_parallel_phase1_audit( + Arc::clone(&store), + &policy, + args.tal_inputs.clone(), + &http, + &rsync, + validation_time, + &config, + args.parallel_phase1_config + .clone() + .expect("phase1 config present"), + ) + .map_err(|e| e.to_string())? + } else { + match ( + args.tal_url.as_ref(), + args.tal_path.as_ref(), + args.ta_path.as_ref(), + ) { (Some(url), _, _) => { if args.parallel_phase1 { run_tree_from_tal_url_parallel_phase1_audit( @@ -1202,6 +1260,7 @@ pub fn run(argv: &[String]) -> Result<(), String> { } } _ => unreachable!("validated by parse_args"), + } } }; @@ -1258,9 +1317,17 @@ pub fn run(argv: &[String]) -> Result<(), String> { let mut ccr_write_ms = None; if let Some(path) = args.ccr_out_path.as_deref() { let started = std::time::Instant::now(); + let trust_anchors = if out.discoveries.is_empty() { + vec![out.discovery.trust_anchor.clone()] + } else { + out.discoveries + .iter() + .map(|item| item.trust_anchor.clone()) + .collect::>() + }; let ccr = build_ccr_from_run( store.as_ref(), - &[out.discovery.trust_anchor.clone()], + &trust_anchors, &out.tree.vrps, &out.tree.aspas, &out.tree.router_keys, @@ -1279,6 +1346,9 @@ pub fn run(argv: &[String]) -> Result<(), String> { let mut cir_write_cir_ms = None; let mut cir_total_ms = None; if args.cir_enabled { + if out.discoveries.len() > 1 { + return Err("CIR export is not yet supported for multi-TAL runs".to_string()); + } let cir_tal_uri = args .tal_url .clone() @@ -1460,7 +1530,7 @@ mod tests { ]; let err = parse_args(&argv).unwrap_err(); assert!( - err.contains("exactly one of --tal-url or --tal-path"), + err.contains("one-or-more --tal-url or one-or-more --tal-path/--ta-path pairs"), "{err}" ); } @@ -1712,6 +1782,7 @@ mod tests { ]; let args = parse_args(&argv).expect("parse"); assert_eq!(args.tal_url.as_deref(), Some("https://example.test/x.tal")); + assert_eq!(args.tal_urls, vec!["https://example.test/x.tal".to_string()]); assert!(args.tal_path.is_none()); assert!(args.ta_path.is_none()); assert_eq!(args.tal_inputs.len(), 1); @@ -1720,24 +1791,24 @@ mod tests { } #[test] - fn parse_accepts_parallel_phase1_with_extra_tal_urls() { + fn parse_accepts_parallel_phase1_with_multiple_tal_urls() { let argv = vec![ "rpki".to_string(), "--db".to_string(), "db".to_string(), "--tal-url".to_string(), "https://example.test/arin.tal".to_string(), - "--parallel-phase1".to_string(), - "--parallel-tal-url".to_string(), + "--tal-url".to_string(), "https://example.test/apnic.tal".to_string(), - "--parallel-tal-url".to_string(), + "--tal-url".to_string(), "https://example.test/ripe.tal".to_string(), + "--parallel-phase1".to_string(), "--parallel-max-repo-sync-workers-global".to_string(), "8".to_string(), ]; let args = parse_args(&argv).expect("parse"); assert!(args.parallel_phase1); - assert_eq!(args.parallel_tal_urls.len(), 2); + assert_eq!(args.tal_urls.len(), 3); assert_eq!(args.tal_inputs.len(), 3); assert_eq!(args.tal_inputs[0].tal_id, "arin"); assert_eq!(args.tal_inputs[1].tal_id, "apnic"); @@ -1751,18 +1822,18 @@ mod tests { } #[test] - fn parse_rejects_parallel_tal_flags_without_parallel_phase1() { + fn parse_rejects_multi_tal_without_parallel_phase1() { let argv = vec![ "rpki".to_string(), "--db".to_string(), "db".to_string(), "--tal-url".to_string(), "https://example.test/arin.tal".to_string(), - "--parallel-tal-url".to_string(), + "--tal-url".to_string(), "https://example.test/apnic.tal".to_string(), ]; let err = parse_args(&argv).unwrap_err(); - assert!(err.contains("require --parallel-phase1"), "{err}"); + assert!(err.contains("requires --parallel-phase1"), "{err}"); } #[test] @@ -1779,11 +1850,71 @@ mod tests { "0".to_string(), ]; let args = parse_args(&argv).expect("parse"); + assert_eq!(args.tal_paths, vec![PathBuf::from("a.tal")]); + assert_eq!(args.ta_paths, vec![PathBuf::from("ta.cer")]); assert_eq!(args.tal_path.as_deref(), Some(Path::new("a.tal"))); assert_eq!(args.ta_path.as_deref(), Some(Path::new("ta.cer"))); assert_eq!(args.max_depth, Some(0)); } + #[test] + fn parse_accepts_parallel_phase1_with_multiple_tal_path_pairs() { + let argv = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-path".to_string(), + "apnic.tal".to_string(), + "--ta-path".to_string(), + "apnic-ta.cer".to_string(), + "--tal-path".to_string(), + "arin.tal".to_string(), + "--ta-path".to_string(), + "arin-ta.cer".to_string(), + "--parallel-phase1".to_string(), + ]; + let args = parse_args(&argv).expect("parse"); + assert_eq!(args.tal_paths.len(), 2); + assert_eq!(args.ta_paths.len(), 2); + assert_eq!(args.tal_inputs.len(), 2); + } + + #[test] + fn parse_rejects_mixed_tal_url_and_tal_path_modes() { + let argv = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-url".to_string(), + "https://example.test/arin.tal".to_string(), + "--tal-path".to_string(), + "apnic.tal".to_string(), + "--ta-path".to_string(), + "apnic-ta.cer".to_string(), + "--parallel-phase1".to_string(), + ]; + let err = parse_args(&argv).unwrap_err(); + assert!(err.contains("must specify either one-or-more --tal-url or one-or-more --tal-path/--ta-path pairs"), "{err}"); + } + + #[test] + fn parse_rejects_mismatched_tal_path_and_ta_path_counts() { + let argv = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-path".to_string(), + "apnic.tal".to_string(), + "--tal-path".to_string(), + "arin.tal".to_string(), + "--ta-path".to_string(), + "apnic-ta.cer".to_string(), + "--parallel-phase1".to_string(), + ]; + let err = parse_args(&argv).unwrap_err(); + assert!(err.contains("--tal-path and --ta-path counts must match"), "{err}"); + } + #[test] fn parse_accepts_tal_path_without_ta_when_disable_rrdp_is_set() { let argv = vec![ @@ -2114,7 +2245,8 @@ mod tests { pp3.rrdp_notification_uri = Some("https://example.test/n2.xml".to_string()); let out = crate::validation::run_tree_from_tal::RunTreeFromTalAuditOutput { - discovery, + discovery: discovery.clone(), + discoveries: vec![discovery], tree, publication_points: vec![pp1, pp2, pp3], downloads: Vec::new(), diff --git a/src/parallel/types.rs b/src/parallel/types.rs index 04f7ae0..d38ce77 100644 --- a/src/parallel/types.rs +++ b/src/parallel/types.rs @@ -6,8 +6,13 @@ use crate::report::Warning; #[derive(Clone, Debug, PartialEq, Eq)] pub enum TalSource { Url(String), - DerBytes { tal_url: String, ta_der: Vec }, + DerBytes { + tal_url: String, + tal_bytes: Vec, + ta_der: Vec, + }, FilePath(PathBuf), + FilePathWithTa { tal_path: PathBuf, ta_path: PathBuf }, } #[derive(Clone, Debug, PartialEq, Eq)] @@ -38,13 +43,35 @@ impl TalInputSpec { } } - pub fn from_ta_der(tal_url: impl Into, ta_der: Vec) -> Self { + pub fn from_file_path_with_ta( + tal_path: impl Into, + ta_path: impl Into, + ) -> Self { + let tal_path = tal_path.into(); + let ta_path = ta_path.into(); + let tal_id = derive_tal_id_from_path(&tal_path); + Self { + rir_id: tal_id.clone(), + tal_id, + source: TalSource::FilePathWithTa { tal_path, ta_path }, + } + } + + pub fn from_ta_der( + tal_url: impl Into, + tal_bytes: Vec, + ta_der: Vec, + ) -> Self { let tal_url = tal_url.into(); let tal_id = derive_tal_id_from_url_like(&tal_url); Self { rir_id: tal_id.clone(), tal_id, - source: TalSource::DerBytes { tal_url, ta_der }, + source: TalSource::DerBytes { + tal_url, + tal_bytes, + ta_der, + }, } } } @@ -316,13 +343,18 @@ mod tests { #[test] fn tal_input_spec_from_ta_der_preserves_payload() { - let spec = TalInputSpec::from_ta_der("https://example.test/ripe.tal", vec![1, 2, 3]); + let spec = TalInputSpec::from_ta_der( + "https://example.test/ripe.tal", + vec![4, 5, 6], + vec![1, 2, 3], + ); assert_eq!(spec.tal_id, "ripe"); assert_eq!(spec.rir_id, "ripe"); assert_eq!( spec.source, TalSource::DerBytes { tal_url: "https://example.test/ripe.tal".to_string(), + tal_bytes: vec![4, 5, 6], ta_der: vec![1, 2, 3], } ); diff --git a/src/validation/run_tree_from_tal.rs b/src/validation/run_tree_from_tal.rs index b35b06a..111f893 100644 --- a/src/validation/run_tree_from_tal.rs +++ b/src/validation/run_tree_from_tal.rs @@ -26,7 +26,7 @@ use crate::validation::from_tal::{ }; use crate::validation::tree::{ CaInstanceHandle, TreeRunAuditOutput, TreeRunConfig, TreeRunError, TreeRunOutput, - run_tree_serial, run_tree_serial_audit, + run_tree_serial, run_tree_serial_audit, run_tree_serial_audit_multi_root, }; use crate::validation::tree_runner::Rpkiv1PublicationPointRunner; use std::collections::HashMap; @@ -79,12 +79,20 @@ pub struct RunTreeFromTalOutput { #[derive(Clone, Debug, PartialEq, Eq)] pub struct RunTreeFromTalAuditOutput { pub discovery: DiscoveredRootCaInstance, + pub discoveries: Vec, pub tree: TreeRunOutput, pub publication_points: Vec, pub downloads: Vec, pub download_stats: crate::audit::AuditDownloadStats, } +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct TalRootDiscovery { + pub tal_input: TalInputSpec, + pub discovery: DiscoveredRootCaInstance, + pub root_handle: CaInstanceHandle, +} + fn make_live_runner<'a>( store: &'a crate::storage::RocksStore, policy: &'a crate::policy::Policy, @@ -149,6 +157,64 @@ where ))) } +fn root_discovery_from_tal_input( + tal_input: &TalInputSpec, + http_fetcher: &dyn Fetcher, + rsync_fetcher: &dyn crate::fetch::rsync::RsyncFetcher, +) -> Result { + match &tal_input.source { + TalSource::Url(url) => discover_root_ca_instance_from_tal_url(http_fetcher, url), + TalSource::DerBytes { + tal_bytes, ta_der, .. + } => discover_root_ca_instance_from_tal_and_ta_der(tal_bytes, ta_der, None), + TalSource::FilePath(path) => { + let tal_bytes = std::fs::read(path).map_err(|e| { + FromTalError::TalFetch(format!("read TAL file failed: {}: {e}", path.display())) + })?; + let tal = crate::data_model::tal::Tal::decode_bytes(&tal_bytes).map_err(FromTalError::from)?; + discover_root_ca_instance_from_tal_with_fetchers(http_fetcher, rsync_fetcher, tal, None) + } + TalSource::FilePathWithTa { tal_path, ta_path } => { + let tal_bytes = std::fs::read(tal_path).map_err(|e| { + FromTalError::TalFetch(format!( + "read TAL file failed: {}: {e}", + tal_path.display() + )) + })?; + let ta_der = std::fs::read(ta_path).map_err(|e| { + FromTalError::TaFetch(format!( + "read TA file failed: {}: {e}", + ta_path.display() + )) + })?; + discover_root_ca_instance_from_tal_and_ta_der(&tal_bytes, &ta_der, None) + } + } +} + +fn discover_multiple_roots_from_tal_inputs( + tal_inputs: &[TalInputSpec], + http_fetcher: &dyn Fetcher, + rsync_fetcher: &dyn crate::fetch::rsync::RsyncFetcher, +) -> Result, RunTreeFromTalError> { + let mut roots = Vec::with_capacity(tal_inputs.len()); + for tal_input in tal_inputs { + let discovery = root_discovery_from_tal_input(tal_input, http_fetcher, rsync_fetcher)?; + let root_handle = root_handle_from_trust_anchor( + &discovery.trust_anchor, + tal_input.tal_id.clone(), + None, + &discovery.ca_instance, + ); + roots.push(TalRootDiscovery { + tal_input: tal_input.clone(), + discovery, + root_handle, + }); + } + Ok(roots) +} + #[derive(Debug, thiserror::Error)] pub enum RunTreeFromTalError { #[error("{0}")] @@ -253,7 +319,8 @@ pub fn run_tree_from_tal_url_serial_audit( let downloads = download_log.snapshot_events(); let download_stats = DownloadLogHandle::stats_from_events(&downloads); Ok(RunTreeFromTalAuditOutput { - discovery, + discovery: discovery.clone(), + discoveries: vec![discovery], tree, publication_points, downloads, @@ -302,7 +369,8 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing( let downloads = download_log.snapshot_events(); let download_stats = DownloadLogHandle::stats_from_events(&downloads); Ok(RunTreeFromTalAuditOutput { - discovery, + discovery: discovery.clone(), + discoveries: vec![discovery], tree, publication_points, downloads, @@ -360,7 +428,8 @@ where let downloads = download_log.snapshot_events(); let download_stats = DownloadLogHandle::stats_from_events(&downloads); Ok(RunTreeFromTalAuditOutput { - discovery, + discovery: discovery.clone(), + discoveries: vec![discovery], tree, publication_points, downloads, @@ -395,6 +464,7 @@ where .tal_url .clone() .unwrap_or_else(|| "embedded-tal".to_string()), + tal_bytes: tal_bytes.to_vec(), ta_der: ta_der.to_vec(), }, }]; @@ -432,7 +502,74 @@ where let downloads = download_log.snapshot_events(); let download_stats = DownloadLogHandle::stats_from_events(&downloads); Ok(RunTreeFromTalAuditOutput { - discovery, + discovery: discovery.clone(), + discoveries: vec![discovery], + tree, + publication_points, + downloads, + download_stats, + }) +} + +pub fn run_tree_from_multiple_tals_parallel_phase1_audit( + store: Arc, + policy: &crate::policy::Policy, + tal_inputs: Vec, + http_fetcher: &H, + rsync_fetcher: &R, + validation_time: time::OffsetDateTime, + config: &TreeRunConfig, + parallel_config: ParallelPhase1Config, +) -> Result +where + H: Fetcher + Clone + 'static, + R: crate::fetch::rsync::RsyncFetcher + Clone + 'static, +{ + if tal_inputs.is_empty() { + return Err(RunTreeFromTalError::Replay("multi-TAL run requires at least one TAL input".to_string())); + } + let roots = discover_multiple_roots_from_tal_inputs(&tal_inputs, http_fetcher, rsync_fetcher)?; + let primary = roots + .first() + .cloned() + .ok_or_else(|| RunTreeFromTalError::Replay("multi-TAL root discovery returned no roots".to_string()))?; + let discoveries = roots.iter().map(|item| item.discovery.clone()).collect::>(); + let root_handles = roots + .into_iter() + .map(|item| item.root_handle) + .collect::>(); + + let download_log = DownloadLogHandle::new(); + let runtime = build_phase1_repo_sync_runtime( + Arc::clone(&store), + policy, + http_fetcher, + rsync_fetcher, + parallel_config, + None, + Some(download_log.clone()), + tal_inputs, + )?; + let runner = make_live_runner( + store.as_ref(), + policy, + http_fetcher, + rsync_fetcher, + validation_time, + None, + Some(download_log.clone()), + Some(runtime), + ); + + let TreeRunAuditOutput { + tree, + publication_points, + } = run_tree_serial_audit_multi_root(root_handles, &runner, config)?; + let downloads = download_log.snapshot_events(); + let download_stats = DownloadLogHandle::stats_from_events(&downloads); + Ok(RunTreeFromTalAuditOutput { + discovery: primary.discovery.clone(), + discoveries, tree, publication_points, downloads, @@ -532,7 +669,8 @@ pub fn run_tree_from_tal_bytes_serial_audit( let downloads = download_log.snapshot_events(); let download_stats = DownloadLogHandle::stats_from_events(&downloads); Ok(RunTreeFromTalAuditOutput { - discovery, + discovery: discovery.clone(), + discoveries: vec![discovery], tree, publication_points, downloads, @@ -595,7 +733,8 @@ pub fn run_tree_from_tal_bytes_serial_audit_with_timing( let downloads = download_log.snapshot_events(); let download_stats = DownloadLogHandle::stats_from_events(&downloads); Ok(RunTreeFromTalAuditOutput { - discovery, + discovery: discovery.clone(), + discoveries: vec![discovery], tree, publication_points, downloads, @@ -649,7 +788,8 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit( let downloads = download_log.snapshot_events(); let download_stats = DownloadLogHandle::stats_from_events(&downloads); Ok(RunTreeFromTalAuditOutput { - discovery, + discovery: discovery.clone(), + discoveries: vec![discovery], tree, publication_points, downloads, @@ -707,7 +847,8 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing( let downloads = download_log.snapshot_events(); let download_stats = DownloadLogHandle::stats_from_events(&downloads); Ok(RunTreeFromTalAuditOutput { - discovery, + discovery: discovery.clone(), + discoveries: vec![discovery], tree, publication_points, downloads, @@ -823,7 +964,8 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit( let downloads = download_log.snapshot_events(); let download_stats = DownloadLogHandle::stats_from_events(&downloads); Ok(RunTreeFromTalAuditOutput { - discovery, + discovery: discovery.clone(), + discoveries: vec![discovery], tree, publication_points, downloads, @@ -891,7 +1033,8 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing( let downloads = download_log.snapshot_events(); let download_stats = DownloadLogHandle::stats_from_events(&downloads); Ok(RunTreeFromTalAuditOutput { - discovery, + discovery: discovery.clone(), + discoveries: vec![discovery], tree, publication_points, downloads, @@ -1089,7 +1232,8 @@ fn run_payload_delta_replay_audit_inner( let downloads = download_log.snapshot_events(); let download_stats = DownloadLogHandle::stats_from_events(&downloads); Ok(RunTreeFromTalAuditOutput { - discovery, + discovery: discovery.clone(), + discoveries: vec![discovery], tree, publication_points, downloads, @@ -1231,7 +1375,8 @@ fn run_payload_delta_replay_step_audit_inner( let downloads = download_log.snapshot_events(); let download_stats = DownloadLogHandle::stats_from_events(&downloads); Ok(RunTreeFromTalAuditOutput { - discovery, + discovery: discovery.clone(), + discoveries: vec![discovery], tree, publication_points, downloads, @@ -1266,6 +1411,75 @@ pub fn run_tree_from_tal_and_ta_der_payload_delta_replay_step_serial_audit( ) } +#[cfg(test)] +mod multi_tal_tests { + use super::*; + + struct RejectingHttpFetcher; + + impl Fetcher for RejectingHttpFetcher { + fn fetch(&self, uri: &str) -> Result, String> { + Err(format!("unexpected http fetch: {uri}")) + } + } + + struct RejectingRsyncFetcher; + + impl crate::fetch::rsync::RsyncFetcher for RejectingRsyncFetcher { + fn fetch_objects( + &self, + _rsync_base_uri: &str, + ) -> crate::fetch::rsync::RsyncFetchResult)>> { + Err(crate::fetch::rsync::RsyncFetchError::Fetch( + "unexpected rsync fetch".to_string(), + )) + } + + fn dedup_key(&self, base_uri: &str) -> String { + base_uri.to_string() + } + } + + #[test] + fn discover_multiple_roots_from_tal_inputs_builds_multiple_root_handles() { + let apnic_tal = std::fs::read("tests/fixtures/tal/apnic-rfc7730-https.tal") + .expect("read apnic tal"); + let apnic_ta = std::fs::read("tests/fixtures/ta/apnic-ta.cer").expect("read apnic ta"); + let arin_tal = std::fs::read("tests/fixtures/tal/arin.tal").expect("read arin tal"); + let arin_ta = std::fs::read("tests/fixtures/ta/arin-ta.cer").expect("read arin ta"); + + let tal_inputs = vec![ + TalInputSpec::from_ta_der( + "https://example.test/apnic.tal", + apnic_tal, + apnic_ta, + ), + TalInputSpec::from_ta_der( + "https://example.test/arin.tal", + arin_tal, + arin_ta, + ), + ]; + + let roots = discover_multiple_roots_from_tal_inputs( + &tal_inputs, + &RejectingHttpFetcher, + &RejectingRsyncFetcher, + ) + .expect("discover roots"); + + assert_eq!(roots.len(), 2); + assert_eq!(roots[0].tal_input.tal_id, "apnic"); + assert_eq!(roots[1].tal_input.tal_id, "arin"); + assert_eq!(roots[0].root_handle.tal_id, "apnic"); + assert_eq!(roots[1].root_handle.tal_id, "arin"); + assert_ne!( + roots[0].root_handle.manifest_rsync_uri, + roots[1].root_handle.manifest_rsync_uri + ); + } +} + #[cfg(test)] mod replay_api_tests { use super::*; diff --git a/src/validation/tree.rs b/src/validation/tree.rs index d87df83..98cacab 100644 --- a/src/validation/tree.rs +++ b/src/validation/tree.rs @@ -122,6 +122,14 @@ pub fn run_tree_serial_audit( root: CaInstanceHandle, runner: &dyn PublicationPointRunner, config: &TreeRunConfig, +) -> Result { + run_tree_serial_audit_multi_root(vec![root], runner, config) +} + +pub fn run_tree_serial_audit_multi_root( + roots: Vec, + runner: &dyn PublicationPointRunner, + config: &TreeRunConfig, ) -> Result { #[derive(Clone, Debug)] struct QueuedCaInstance { @@ -133,13 +141,15 @@ pub fn run_tree_serial_audit( let mut next_id: u64 = 0; let mut queue: std::collections::VecDeque = std::collections::VecDeque::new(); - queue.push_back(QueuedCaInstance { - id: next_id, - handle: root, - parent_id: None, - discovered_from: None, - }); - next_id += 1; + for root in roots { + queue.push_back(QueuedCaInstance { + id: next_id, + handle: root, + parent_id: None, + discovered_from: None, + }); + next_id += 1; + } let mut visited_manifest_uris: std::collections::HashSet = std::collections::HashSet::new(); @@ -245,6 +255,7 @@ mod tests { use super::{ CaInstanceHandle, DiscoveredChildCaInstance, PublicationPointRunResult, PublicationPointRunner, TreeRunConfig, run_tree_serial_audit, + run_tree_serial_audit_multi_root, }; fn sample_handle(manifest: &str) -> CaInstanceHandle { @@ -339,4 +350,23 @@ mod tests { ["rsync://example.test/repo/child.mft"] ); } + + #[test] + fn run_tree_serial_audit_multi_root_processes_all_roots() { + let seen = Arc::new(Mutex::new(Vec::new())); + let runner = PrefetchRecordingRunner { + seen_prefetch_children: Arc::clone(&seen), + }; + let out = run_tree_serial_audit_multi_root( + vec![ + sample_handle("rsync://example.test/repo/root-a.mft"), + sample_handle("rsync://example.test/repo/root-b.mft"), + ], + &runner, + &TreeRunConfig::default(), + ) + .expect("tree run"); + assert_eq!(out.tree.instances_processed, 2); + assert_eq!(out.publication_points.len(), 2); + } } diff --git a/tests/test_multi_tal_parallel_m2.rs b/tests/test_multi_tal_parallel_m2.rs new file mode 100644 index 0000000..82f23a4 --- /dev/null +++ b/tests/test_multi_tal_parallel_m2.rs @@ -0,0 +1,104 @@ +use std::collections::BTreeSet; +use std::path::{Path, PathBuf}; + +use rpki::bundle::{VapCompareRow, VrpCompareRow, decode_ccr_compare_views}; + +fn fixture(rel: &str) -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(rel) +} + +fn run_case( + name: &str, + tal_ta_pairs: &[(&Path, &Path)], +) -> ( + serde_json::Value, + BTreeSet, + BTreeSet, + rpki::ccr::CcrContentInfo, +) { + let db_dir = tempfile::tempdir().expect("db tempdir"); + let out_dir = tempfile::tempdir().expect("out tempdir"); + let report_path = out_dir.path().join(format!("{name}.report.json")); + let ccr_path = out_dir.path().join(format!("{name}.ccr")); + + let mut argv = vec![ + "rpki".to_string(), + "--db".to_string(), + db_dir.path().to_string_lossy().to_string(), + "--parallel-phase1".to_string(), + "--disable-rrdp".to_string(), + "--rsync-local-dir".to_string(), + fixture("tests/fixtures/repository").to_string_lossy().to_string(), + "--validation-time".to_string(), + "2026-04-07T00:00:00Z".to_string(), + "--max-depth".to_string(), + "0".to_string(), + "--max-instances".to_string(), + tal_ta_pairs.len().to_string(), + ]; + for (tal_path, ta_path) in tal_ta_pairs { + argv.push("--tal-path".to_string()); + argv.push(tal_path.to_string_lossy().to_string()); + argv.push("--ta-path".to_string()); + argv.push(ta_path.to_string_lossy().to_string()); + } + argv.extend([ + "--report-json".to_string(), + report_path.to_string_lossy().to_string(), + "--ccr-out".to_string(), + ccr_path.to_string_lossy().to_string(), + ]); + + rpki::cli::run(&argv).expect("cli run"); + + let report: serde_json::Value = + serde_json::from_slice(&std::fs::read(&report_path).expect("read report")) + .expect("parse report"); + let ccr = rpki::ccr::decode_content_info(&std::fs::read(&ccr_path).expect("read ccr")) + .expect("decode ccr"); + let (vrps, vaps) = decode_ccr_compare_views(&ccr, "multi").expect("compare views"); + (report, vrps, vaps, ccr) +} + +#[test] +fn multi_tal_parallel_output_matches_union_of_single_tal_outputs() { + let apnic_tal = fixture("tests/fixtures/tal/apnic-rfc7730-https.tal"); + let apnic_ta = fixture("tests/fixtures/ta/apnic-ta.cer"); + let arin_tal = fixture("tests/fixtures/tal/arin.tal"); + let arin_ta = fixture("tests/fixtures/ta/arin-ta.cer"); + + let (apnic_report, apnic_vrps, apnic_vaps, _) = + run_case("apnic", &[(&apnic_tal, &apnic_ta)]); + let (arin_report, arin_vrps, arin_vaps, _) = run_case("arin", &[(&arin_tal, &arin_ta)]); + let (multi_report, multi_vrps, multi_vaps, multi_ccr) = + run_case("multi", &[(&apnic_tal, &apnic_ta), (&arin_tal, &arin_ta)]); + + let expected_vrps = apnic_vrps.union(&arin_vrps).cloned().collect::>(); + let expected_vaps = apnic_vaps.union(&arin_vaps).cloned().collect::>(); + + assert_eq!(multi_vrps, expected_vrps); + assert_eq!(multi_vaps, expected_vaps); + + let single_point_count = apnic_report["publication_points"] + .as_array() + .expect("apnic points") + .len() + + arin_report["publication_points"] + .as_array() + .expect("arin points") + .len(); + assert_eq!( + multi_report["publication_points"] + .as_array() + .expect("multi points") + .len(), + single_point_count + ); + + let tas = multi_ccr + .content + .tas + .as_ref() + .expect("multi-TAL CCR must include trust anchors"); + assert_eq!(tas.skis.len(), 2); +}