20260415_2 支持多个RIR并行混合执行,APNIC+ARIN运行1 snapshot + 1 delta,对比rpki-client,snapshot比rpki-client慢4分钟(398vs137),输出未收敛,delta时输出收敛(348vs181),评估应该是正确性没有问题,下一步进一步优化性能

This commit is contained in:
yuyr 2026-04-16 11:33:52 +08:00
parent 585c41b83b
commit 38421b1ae7
5 changed files with 594 additions and 82 deletions

View File

@ -19,6 +19,7 @@ use crate::validation::run_tree_from_tal::{
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit_with_timing, run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit_with_timing,
run_tree_from_tal_and_ta_der_payload_replay_serial_audit, run_tree_from_tal_and_ta_der_payload_replay_serial_audit,
run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing, run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing,
run_tree_from_multiple_tals_parallel_phase1_audit,
run_tree_from_tal_and_ta_der_parallel_phase1_audit, run_tree_from_tal_and_ta_der_parallel_phase1_audit,
run_tree_from_tal_and_ta_der_serial_audit, run_tree_from_tal_and_ta_der_serial_audit,
run_tree_from_tal_and_ta_der_serial_audit_with_timing, run_tree_from_tal_url_serial_audit, run_tree_from_tal_and_ta_der_serial_audit_with_timing, run_tree_from_tal_url_serial_audit,
@ -51,6 +52,9 @@ struct RunStageTiming {
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
pub struct CliArgs { pub struct CliArgs {
pub tal_urls: Vec<String>,
pub tal_paths: Vec<PathBuf>,
pub ta_paths: Vec<PathBuf>,
pub tal_url: Option<String>, pub tal_url: Option<String>,
pub tal_path: Option<PathBuf>, pub tal_path: Option<PathBuf>,
pub ta_path: Option<PathBuf>, pub ta_path: Option<PathBuf>,
@ -97,8 +101,8 @@ fn usage() -> String {
format!( format!(
"\ "\
Usage: Usage:
{bin} --db <path> --tal-url <url> [options] {bin} --db <path> --tal-url <url> [--tal-url <url> ...] [options]
{bin} --db <path> --tal-path <path> --ta-path <path> [options] {bin} --db <path> --tal-path <path> --ta-path <path> [--tal-path <path> --ta-path <path> ...] [options]
Options: Options:
--db <path> RocksDB directory path (required) --db <path> RocksDB directory path (required)
@ -118,11 +122,10 @@ Options:
--payload-delta-archive <path> Use local delta payload archive root (offline delta replay) --payload-delta-archive <path> Use local delta payload archive root (offline delta replay)
--payload-delta-locks <path> Use local locks-delta.json (offline delta replay) --payload-delta-locks <path> Use local locks-delta.json (offline delta replay)
--tal-url <url> TAL URL (downloads TAL + TA over HTTPS) --tal-url <url> TAL URL (repeatable; URL mode)
--tal-path <path> TAL file path (offline-friendly; requires --ta-path) --tal-path <path> TAL file path (repeatable; file mode)
--ta-path <path> TA certificate DER file path (offline-friendly) --ta-path <path> TA certificate DER file path (repeatable in file mode; pairs with --tal-path by position)
--parallel-phase1 Enable Phase 1 parallel scheduler skeleton --parallel-phase1 Enable Phase 1 parallel scheduler skeleton
--parallel-tal-url <url> Additional TAL URL for future multi-TAL runs (repeatable)
--parallel-max-repo-sync-workers-global <n> --parallel-max-repo-sync-workers-global <n>
Phase 1 global repo sync worker budget Phase 1 global repo sync worker budget
--parallel-max-inflight-snapshot-bytes-global <n> --parallel-max-inflight-snapshot-bytes-global <n>
@ -148,11 +151,10 @@ Options:
} }
pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> { pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
let mut tal_url: Option<String> = None; let mut tal_urls: Vec<String> = Vec::new();
let mut tal_path: Option<PathBuf> = None; let mut tal_paths: Vec<PathBuf> = Vec::new();
let mut ta_path: Option<PathBuf> = None; let mut ta_paths: Vec<PathBuf> = Vec::new();
let mut parallel_phase1: bool = false; let mut parallel_phase1: bool = false;
let mut parallel_tal_urls: Vec<String> = Vec::new();
let mut parallel_phase1_cfg = ParallelPhase1Config::default(); let mut parallel_phase1_cfg = ParallelPhase1Config::default();
let mut parallel_phase1_cfg_overridden: bool = false; let mut parallel_phase1_cfg_overridden: bool = false;
@ -193,26 +195,21 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
"--tal-url" => { "--tal-url" => {
i += 1; i += 1;
let v = argv.get(i).ok_or("--tal-url requires a value")?; let v = argv.get(i).ok_or("--tal-url requires a value")?;
tal_url = Some(v.clone()); tal_urls.push(v.clone());
} }
"--tal-path" => { "--tal-path" => {
i += 1; i += 1;
let v = argv.get(i).ok_or("--tal-path requires a value")?; let v = argv.get(i).ok_or("--tal-path requires a value")?;
tal_path = Some(PathBuf::from(v)); tal_paths.push(PathBuf::from(v));
} }
"--ta-path" => { "--ta-path" => {
i += 1; i += 1;
let v = argv.get(i).ok_or("--ta-path requires a value")?; let v = argv.get(i).ok_or("--ta-path requires a value")?;
ta_path = Some(PathBuf::from(v)); ta_paths.push(PathBuf::from(v));
} }
"--parallel-phase1" => { "--parallel-phase1" => {
parallel_phase1 = true; parallel_phase1 = true;
} }
"--parallel-tal-url" => {
i += 1;
let v = argv.get(i).ok_or("--parallel-tal-url requires a value")?;
parallel_tal_urls.push(v.clone());
}
"--parallel-max-repo-sync-workers-global" => { "--parallel-max-repo-sync-workers-global" => {
i += 1; i += 1;
let v = argv let v = argv
@ -407,25 +404,50 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
let db_path = db_path.ok_or_else(|| format!("--db is required\n\n{}", usage()))?; let db_path = db_path.ok_or_else(|| format!("--db is required\n\n{}", usage()))?;
let tal_mode_count = tal_url.is_some() as u8 + tal_path.is_some() as u8; let tal_mode_count = (!tal_urls.is_empty()) as u8 + (!tal_paths.is_empty()) as u8;
if tal_mode_count != 1 { if tal_mode_count != 1 {
return Err(format!( return Err(format!(
"must specify exactly one of --tal-url or --tal-path\n\n{}", "must specify either one-or-more --tal-url or one-or-more --tal-path/--ta-path pairs\n\n{}",
usage() usage()
)); ));
} }
if !parallel_phase1 && (!parallel_tal_urls.is_empty() || parallel_phase1_cfg_overridden) { if !parallel_phase1 && parallel_phase1_cfg_overridden {
return Err(format!( return Err(format!(
"--parallel-tal-url and --parallel-max-* options require --parallel-phase1\n\n{}", "--parallel-max-* options require --parallel-phase1\n\n{}",
usage() usage()
)); ));
} }
if tal_path.is_some() && ta_path.is_none() && !disable_rrdp { if !tal_urls.is_empty() && !ta_paths.is_empty() {
return Err(format!( return Err(format!(
"--tal-path requires --ta-path unless --disable-rrdp is set\n\n{}", "--ta-path cannot be used with --tal-url mode\n\n{}",
usage() usage()
)); ));
} }
if !tal_paths.is_empty() {
let strict_pairing_required = parallel_phase1 || tal_paths.len() > 1 || !ta_paths.is_empty();
if strict_pairing_required {
if ta_paths.len() != tal_paths.len() {
return Err(format!(
"--tal-path and --ta-path counts must match in file mode\n\n{}",
usage()
));
}
} else if ta_paths.is_empty() && !disable_rrdp {
return Err(format!(
"--tal-path requires --ta-path unless --disable-rrdp is set\n\n{}",
usage()
));
}
}
if !parallel_phase1 && (tal_urls.len() > 1 || tal_paths.len() > 1) {
return Err(format!(
"multi-TAL execution requires --parallel-phase1\n\n{}",
usage()
));
}
let tal_url = tal_urls.first().cloned();
let tal_path = tal_paths.first().cloned();
let ta_path = ta_paths.first().cloned();
let cir_backend_count = cir_static_root.is_some() as u8 + raw_store_db.is_some() as u8; let cir_backend_count = cir_static_root.is_some() as u8 + raw_store_db.is_some() as u8;
if cir_enabled && (cir_out_path.is_none() || cir_backend_count != 1) { if cir_enabled && (cir_out_path.is_none() || cir_backend_count != 1) {
return Err(format!( return Err(format!(
@ -533,26 +555,31 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
} }
let mut tal_inputs = Vec::new(); let mut tal_inputs = Vec::new();
if let Some(url) = tal_url.as_ref() { if !tal_urls.is_empty() {
tal_inputs.push(TalInputSpec::from_url(url.clone())); tal_inputs.extend(tal_urls.iter().cloned().map(TalInputSpec::from_url));
} else if let Some(path) = tal_path.as_ref() { } else if !tal_paths.is_empty() {
tal_inputs.push(TalInputSpec::from_file_path(path.clone())); if ta_paths.len() == tal_paths.len() {
} tal_inputs.extend(
if parallel_phase1 { tal_paths
tal_inputs.extend( .iter()
parallel_tal_urls .cloned()
.iter() .zip(ta_paths.iter().cloned())
.cloned() .map(|(tal_path, ta_path)| TalInputSpec::from_file_path_with_ta(tal_path, ta_path)),
.map(TalInputSpec::from_url), );
); } else {
tal_inputs.extend(tal_paths.iter().cloned().map(TalInputSpec::from_file_path));
}
} }
Ok(CliArgs { Ok(CliArgs {
tal_urls,
tal_paths,
ta_paths,
tal_url, tal_url,
tal_path, tal_path,
ta_path, ta_path,
parallel_phase1, parallel_phase1,
parallel_tal_urls, parallel_tal_urls: Vec::new(),
parallel_phase1_config: parallel_phase1.then_some(parallel_phase1_cfg), parallel_phase1_config: parallel_phase1.then_some(parallel_phase1_cfg),
tal_inputs, tal_inputs,
db_path, db_path,
@ -932,11 +959,26 @@ pub fn run(argv: &[String]) -> Result<(), String> {
}) })
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
let rsync = LocalDirRsyncFetcher::new(dir); let rsync = LocalDirRsyncFetcher::new(dir);
match ( if args.parallel_phase1 && args.tal_inputs.len() > 1 {
args.tal_url.as_ref(), run_tree_from_multiple_tals_parallel_phase1_audit(
args.tal_path.as_ref(), Arc::clone(&store),
args.ta_path.as_ref(), &policy,
) { args.tal_inputs.clone(),
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
)
.map_err(|e| e.to_string())?
} else {
match (
args.tal_url.as_ref(),
args.tal_path.as_ref(),
args.ta_path.as_ref(),
) {
(Some(url), _, _) => { (Some(url), _, _) => {
if args.parallel_phase1 { if args.parallel_phase1 {
run_tree_from_tal_url_parallel_phase1_audit( run_tree_from_tal_url_parallel_phase1_audit(
@ -1059,6 +1101,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
} }
} }
_ => unreachable!("validated by parse_args"), _ => unreachable!("validated by parse_args"),
}
} }
} else { } else {
let http = BlockingHttpFetcher::new(HttpFetcherConfig { let http = BlockingHttpFetcher::new(HttpFetcherConfig {
@ -1075,11 +1118,26 @@ pub fn run(argv: &[String]) -> Result<(), String> {
mirror_root: args.rsync_mirror_root.clone(), mirror_root: args.rsync_mirror_root.clone(),
..SystemRsyncConfig::default() ..SystemRsyncConfig::default()
}); });
match ( if args.parallel_phase1 && args.tal_inputs.len() > 1 {
args.tal_url.as_ref(), run_tree_from_multiple_tals_parallel_phase1_audit(
args.tal_path.as_ref(), Arc::clone(&store),
args.ta_path.as_ref(), &policy,
) { args.tal_inputs.clone(),
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
)
.map_err(|e| e.to_string())?
} else {
match (
args.tal_url.as_ref(),
args.tal_path.as_ref(),
args.ta_path.as_ref(),
) {
(Some(url), _, _) => { (Some(url), _, _) => {
if args.parallel_phase1 { if args.parallel_phase1 {
run_tree_from_tal_url_parallel_phase1_audit( run_tree_from_tal_url_parallel_phase1_audit(
@ -1202,6 +1260,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
} }
} }
_ => unreachable!("validated by parse_args"), _ => unreachable!("validated by parse_args"),
}
} }
}; };
@ -1258,9 +1317,17 @@ pub fn run(argv: &[String]) -> Result<(), String> {
let mut ccr_write_ms = None; let mut ccr_write_ms = None;
if let Some(path) = args.ccr_out_path.as_deref() { if let Some(path) = args.ccr_out_path.as_deref() {
let started = std::time::Instant::now(); let started = std::time::Instant::now();
let trust_anchors = if out.discoveries.is_empty() {
vec![out.discovery.trust_anchor.clone()]
} else {
out.discoveries
.iter()
.map(|item| item.trust_anchor.clone())
.collect::<Vec<_>>()
};
let ccr = build_ccr_from_run( let ccr = build_ccr_from_run(
store.as_ref(), store.as_ref(),
&[out.discovery.trust_anchor.clone()], &trust_anchors,
&out.tree.vrps, &out.tree.vrps,
&out.tree.aspas, &out.tree.aspas,
&out.tree.router_keys, &out.tree.router_keys,
@ -1279,6 +1346,9 @@ pub fn run(argv: &[String]) -> Result<(), String> {
let mut cir_write_cir_ms = None; let mut cir_write_cir_ms = None;
let mut cir_total_ms = None; let mut cir_total_ms = None;
if args.cir_enabled { if args.cir_enabled {
if out.discoveries.len() > 1 {
return Err("CIR export is not yet supported for multi-TAL runs".to_string());
}
let cir_tal_uri = args let cir_tal_uri = args
.tal_url .tal_url
.clone() .clone()
@ -1460,7 +1530,7 @@ mod tests {
]; ];
let err = parse_args(&argv).unwrap_err(); let err = parse_args(&argv).unwrap_err();
assert!( assert!(
err.contains("exactly one of --tal-url or --tal-path"), err.contains("one-or-more --tal-url or one-or-more --tal-path/--ta-path pairs"),
"{err}" "{err}"
); );
} }
@ -1712,6 +1782,7 @@ mod tests {
]; ];
let args = parse_args(&argv).expect("parse"); let args = parse_args(&argv).expect("parse");
assert_eq!(args.tal_url.as_deref(), Some("https://example.test/x.tal")); assert_eq!(args.tal_url.as_deref(), Some("https://example.test/x.tal"));
assert_eq!(args.tal_urls, vec!["https://example.test/x.tal".to_string()]);
assert!(args.tal_path.is_none()); assert!(args.tal_path.is_none());
assert!(args.ta_path.is_none()); assert!(args.ta_path.is_none());
assert_eq!(args.tal_inputs.len(), 1); assert_eq!(args.tal_inputs.len(), 1);
@ -1720,24 +1791,24 @@ mod tests {
} }
#[test] #[test]
fn parse_accepts_parallel_phase1_with_extra_tal_urls() { fn parse_accepts_parallel_phase1_with_multiple_tal_urls() {
let argv = vec![ let argv = vec![
"rpki".to_string(), "rpki".to_string(),
"--db".to_string(), "--db".to_string(),
"db".to_string(), "db".to_string(),
"--tal-url".to_string(), "--tal-url".to_string(),
"https://example.test/arin.tal".to_string(), "https://example.test/arin.tal".to_string(),
"--parallel-phase1".to_string(), "--tal-url".to_string(),
"--parallel-tal-url".to_string(),
"https://example.test/apnic.tal".to_string(), "https://example.test/apnic.tal".to_string(),
"--parallel-tal-url".to_string(), "--tal-url".to_string(),
"https://example.test/ripe.tal".to_string(), "https://example.test/ripe.tal".to_string(),
"--parallel-phase1".to_string(),
"--parallel-max-repo-sync-workers-global".to_string(), "--parallel-max-repo-sync-workers-global".to_string(),
"8".to_string(), "8".to_string(),
]; ];
let args = parse_args(&argv).expect("parse"); let args = parse_args(&argv).expect("parse");
assert!(args.parallel_phase1); assert!(args.parallel_phase1);
assert_eq!(args.parallel_tal_urls.len(), 2); assert_eq!(args.tal_urls.len(), 3);
assert_eq!(args.tal_inputs.len(), 3); assert_eq!(args.tal_inputs.len(), 3);
assert_eq!(args.tal_inputs[0].tal_id, "arin"); assert_eq!(args.tal_inputs[0].tal_id, "arin");
assert_eq!(args.tal_inputs[1].tal_id, "apnic"); assert_eq!(args.tal_inputs[1].tal_id, "apnic");
@ -1751,18 +1822,18 @@ mod tests {
} }
#[test] #[test]
fn parse_rejects_parallel_tal_flags_without_parallel_phase1() { fn parse_rejects_multi_tal_without_parallel_phase1() {
let argv = vec![ let argv = vec![
"rpki".to_string(), "rpki".to_string(),
"--db".to_string(), "--db".to_string(),
"db".to_string(), "db".to_string(),
"--tal-url".to_string(), "--tal-url".to_string(),
"https://example.test/arin.tal".to_string(), "https://example.test/arin.tal".to_string(),
"--parallel-tal-url".to_string(), "--tal-url".to_string(),
"https://example.test/apnic.tal".to_string(), "https://example.test/apnic.tal".to_string(),
]; ];
let err = parse_args(&argv).unwrap_err(); let err = parse_args(&argv).unwrap_err();
assert!(err.contains("require --parallel-phase1"), "{err}"); assert!(err.contains("requires --parallel-phase1"), "{err}");
} }
#[test] #[test]
@ -1779,11 +1850,71 @@ mod tests {
"0".to_string(), "0".to_string(),
]; ];
let args = parse_args(&argv).expect("parse"); let args = parse_args(&argv).expect("parse");
assert_eq!(args.tal_paths, vec![PathBuf::from("a.tal")]);
assert_eq!(args.ta_paths, vec![PathBuf::from("ta.cer")]);
assert_eq!(args.tal_path.as_deref(), Some(Path::new("a.tal"))); assert_eq!(args.tal_path.as_deref(), Some(Path::new("a.tal")));
assert_eq!(args.ta_path.as_deref(), Some(Path::new("ta.cer"))); assert_eq!(args.ta_path.as_deref(), Some(Path::new("ta.cer")));
assert_eq!(args.max_depth, Some(0)); assert_eq!(args.max_depth, Some(0));
} }
#[test]
fn parse_accepts_parallel_phase1_with_multiple_tal_path_pairs() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"apnic.tal".to_string(),
"--ta-path".to_string(),
"apnic-ta.cer".to_string(),
"--tal-path".to_string(),
"arin.tal".to_string(),
"--ta-path".to_string(),
"arin-ta.cer".to_string(),
"--parallel-phase1".to_string(),
];
let args = parse_args(&argv).expect("parse");
assert_eq!(args.tal_paths.len(), 2);
assert_eq!(args.ta_paths.len(), 2);
assert_eq!(args.tal_inputs.len(), 2);
}
#[test]
fn parse_rejects_mixed_tal_url_and_tal_path_modes() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/arin.tal".to_string(),
"--tal-path".to_string(),
"apnic.tal".to_string(),
"--ta-path".to_string(),
"apnic-ta.cer".to_string(),
"--parallel-phase1".to_string(),
];
let err = parse_args(&argv).unwrap_err();
assert!(err.contains("must specify either one-or-more --tal-url or one-or-more --tal-path/--ta-path pairs"), "{err}");
}
#[test]
fn parse_rejects_mismatched_tal_path_and_ta_path_counts() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"apnic.tal".to_string(),
"--tal-path".to_string(),
"arin.tal".to_string(),
"--ta-path".to_string(),
"apnic-ta.cer".to_string(),
"--parallel-phase1".to_string(),
];
let err = parse_args(&argv).unwrap_err();
assert!(err.contains("--tal-path and --ta-path counts must match"), "{err}");
}
#[test] #[test]
fn parse_accepts_tal_path_without_ta_when_disable_rrdp_is_set() { fn parse_accepts_tal_path_without_ta_when_disable_rrdp_is_set() {
let argv = vec![ let argv = vec![
@ -2114,7 +2245,8 @@ mod tests {
pp3.rrdp_notification_uri = Some("https://example.test/n2.xml".to_string()); pp3.rrdp_notification_uri = Some("https://example.test/n2.xml".to_string());
let out = crate::validation::run_tree_from_tal::RunTreeFromTalAuditOutput { let out = crate::validation::run_tree_from_tal::RunTreeFromTalAuditOutput {
discovery, discovery: discovery.clone(),
discoveries: vec![discovery],
tree, tree,
publication_points: vec![pp1, pp2, pp3], publication_points: vec![pp1, pp2, pp3],
downloads: Vec::new(), downloads: Vec::new(),

View File

@ -6,8 +6,13 @@ use crate::report::Warning;
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
pub enum TalSource { pub enum TalSource {
Url(String), Url(String),
DerBytes { tal_url: String, ta_der: Vec<u8> }, DerBytes {
tal_url: String,
tal_bytes: Vec<u8>,
ta_der: Vec<u8>,
},
FilePath(PathBuf), FilePath(PathBuf),
FilePathWithTa { tal_path: PathBuf, ta_path: PathBuf },
} }
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
@ -38,13 +43,35 @@ impl TalInputSpec {
} }
} }
pub fn from_ta_der(tal_url: impl Into<String>, ta_der: Vec<u8>) -> Self { pub fn from_file_path_with_ta(
tal_path: impl Into<PathBuf>,
ta_path: impl Into<PathBuf>,
) -> Self {
let tal_path = tal_path.into();
let ta_path = ta_path.into();
let tal_id = derive_tal_id_from_path(&tal_path);
Self {
rir_id: tal_id.clone(),
tal_id,
source: TalSource::FilePathWithTa { tal_path, ta_path },
}
}
pub fn from_ta_der(
tal_url: impl Into<String>,
tal_bytes: Vec<u8>,
ta_der: Vec<u8>,
) -> Self {
let tal_url = tal_url.into(); let tal_url = tal_url.into();
let tal_id = derive_tal_id_from_url_like(&tal_url); let tal_id = derive_tal_id_from_url_like(&tal_url);
Self { Self {
rir_id: tal_id.clone(), rir_id: tal_id.clone(),
tal_id, tal_id,
source: TalSource::DerBytes { tal_url, ta_der }, source: TalSource::DerBytes {
tal_url,
tal_bytes,
ta_der,
},
} }
} }
} }
@ -316,13 +343,18 @@ mod tests {
#[test] #[test]
fn tal_input_spec_from_ta_der_preserves_payload() { fn tal_input_spec_from_ta_der_preserves_payload() {
let spec = TalInputSpec::from_ta_der("https://example.test/ripe.tal", vec![1, 2, 3]); let spec = TalInputSpec::from_ta_der(
"https://example.test/ripe.tal",
vec![4, 5, 6],
vec![1, 2, 3],
);
assert_eq!(spec.tal_id, "ripe"); assert_eq!(spec.tal_id, "ripe");
assert_eq!(spec.rir_id, "ripe"); assert_eq!(spec.rir_id, "ripe");
assert_eq!( assert_eq!(
spec.source, spec.source,
TalSource::DerBytes { TalSource::DerBytes {
tal_url: "https://example.test/ripe.tal".to_string(), tal_url: "https://example.test/ripe.tal".to_string(),
tal_bytes: vec![4, 5, 6],
ta_der: vec![1, 2, 3], ta_der: vec![1, 2, 3],
} }
); );

View File

@ -26,7 +26,7 @@ use crate::validation::from_tal::{
}; };
use crate::validation::tree::{ use crate::validation::tree::{
CaInstanceHandle, TreeRunAuditOutput, TreeRunConfig, TreeRunError, TreeRunOutput, CaInstanceHandle, TreeRunAuditOutput, TreeRunConfig, TreeRunError, TreeRunOutput,
run_tree_serial, run_tree_serial_audit, run_tree_serial, run_tree_serial_audit, run_tree_serial_audit_multi_root,
}; };
use crate::validation::tree_runner::Rpkiv1PublicationPointRunner; use crate::validation::tree_runner::Rpkiv1PublicationPointRunner;
use std::collections::HashMap; use std::collections::HashMap;
@ -79,12 +79,20 @@ pub struct RunTreeFromTalOutput {
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
pub struct RunTreeFromTalAuditOutput { pub struct RunTreeFromTalAuditOutput {
pub discovery: DiscoveredRootCaInstance, pub discovery: DiscoveredRootCaInstance,
pub discoveries: Vec<DiscoveredRootCaInstance>,
pub tree: TreeRunOutput, pub tree: TreeRunOutput,
pub publication_points: Vec<PublicationPointAudit>, pub publication_points: Vec<PublicationPointAudit>,
pub downloads: Vec<crate::audit::AuditDownloadEvent>, pub downloads: Vec<crate::audit::AuditDownloadEvent>,
pub download_stats: crate::audit::AuditDownloadStats, pub download_stats: crate::audit::AuditDownloadStats,
} }
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TalRootDiscovery {
pub tal_input: TalInputSpec,
pub discovery: DiscoveredRootCaInstance,
pub root_handle: CaInstanceHandle,
}
fn make_live_runner<'a>( fn make_live_runner<'a>(
store: &'a crate::storage::RocksStore, store: &'a crate::storage::RocksStore,
policy: &'a crate::policy::Policy, policy: &'a crate::policy::Policy,
@ -149,6 +157,64 @@ where
))) )))
} }
fn root_discovery_from_tal_input(
tal_input: &TalInputSpec,
http_fetcher: &dyn Fetcher,
rsync_fetcher: &dyn crate::fetch::rsync::RsyncFetcher,
) -> Result<DiscoveredRootCaInstance, FromTalError> {
match &tal_input.source {
TalSource::Url(url) => discover_root_ca_instance_from_tal_url(http_fetcher, url),
TalSource::DerBytes {
tal_bytes, ta_der, ..
} => discover_root_ca_instance_from_tal_and_ta_der(tal_bytes, ta_der, None),
TalSource::FilePath(path) => {
let tal_bytes = std::fs::read(path).map_err(|e| {
FromTalError::TalFetch(format!("read TAL file failed: {}: {e}", path.display()))
})?;
let tal = crate::data_model::tal::Tal::decode_bytes(&tal_bytes).map_err(FromTalError::from)?;
discover_root_ca_instance_from_tal_with_fetchers(http_fetcher, rsync_fetcher, tal, None)
}
TalSource::FilePathWithTa { tal_path, ta_path } => {
let tal_bytes = std::fs::read(tal_path).map_err(|e| {
FromTalError::TalFetch(format!(
"read TAL file failed: {}: {e}",
tal_path.display()
))
})?;
let ta_der = std::fs::read(ta_path).map_err(|e| {
FromTalError::TaFetch(format!(
"read TA file failed: {}: {e}",
ta_path.display()
))
})?;
discover_root_ca_instance_from_tal_and_ta_der(&tal_bytes, &ta_der, None)
}
}
}
fn discover_multiple_roots_from_tal_inputs(
tal_inputs: &[TalInputSpec],
http_fetcher: &dyn Fetcher,
rsync_fetcher: &dyn crate::fetch::rsync::RsyncFetcher,
) -> Result<Vec<TalRootDiscovery>, RunTreeFromTalError> {
let mut roots = Vec::with_capacity(tal_inputs.len());
for tal_input in tal_inputs {
let discovery = root_discovery_from_tal_input(tal_input, http_fetcher, rsync_fetcher)?;
let root_handle = root_handle_from_trust_anchor(
&discovery.trust_anchor,
tal_input.tal_id.clone(),
None,
&discovery.ca_instance,
);
roots.push(TalRootDiscovery {
tal_input: tal_input.clone(),
discovery,
root_handle,
});
}
Ok(roots)
}
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum RunTreeFromTalError { pub enum RunTreeFromTalError {
#[error("{0}")] #[error("{0}")]
@ -253,7 +319,8 @@ pub fn run_tree_from_tal_url_serial_audit(
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads); let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput { Ok(RunTreeFromTalAuditOutput {
discovery, discovery: discovery.clone(),
discoveries: vec![discovery],
tree, tree,
publication_points, publication_points,
downloads, downloads,
@ -302,7 +369,8 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing(
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads); let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput { Ok(RunTreeFromTalAuditOutput {
discovery, discovery: discovery.clone(),
discoveries: vec![discovery],
tree, tree,
publication_points, publication_points,
downloads, downloads,
@ -360,7 +428,8 @@ where
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads); let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput { Ok(RunTreeFromTalAuditOutput {
discovery, discovery: discovery.clone(),
discoveries: vec![discovery],
tree, tree,
publication_points, publication_points,
downloads, downloads,
@ -395,6 +464,7 @@ where
.tal_url .tal_url
.clone() .clone()
.unwrap_or_else(|| "embedded-tal".to_string()), .unwrap_or_else(|| "embedded-tal".to_string()),
tal_bytes: tal_bytes.to_vec(),
ta_der: ta_der.to_vec(), ta_der: ta_der.to_vec(),
}, },
}]; }];
@ -432,7 +502,74 @@ where
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads); let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput { Ok(RunTreeFromTalAuditOutput {
discovery, discovery: discovery.clone(),
discoveries: vec![discovery],
tree,
publication_points,
downloads,
download_stats,
})
}
pub fn run_tree_from_multiple_tals_parallel_phase1_audit<H, R>(
store: Arc<crate::storage::RocksStore>,
policy: &crate::policy::Policy,
tal_inputs: Vec<TalInputSpec>,
http_fetcher: &H,
rsync_fetcher: &R,
validation_time: time::OffsetDateTime,
config: &TreeRunConfig,
parallel_config: ParallelPhase1Config,
) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError>
where
H: Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{
if tal_inputs.is_empty() {
return Err(RunTreeFromTalError::Replay("multi-TAL run requires at least one TAL input".to_string()));
}
let roots = discover_multiple_roots_from_tal_inputs(&tal_inputs, http_fetcher, rsync_fetcher)?;
let primary = roots
.first()
.cloned()
.ok_or_else(|| RunTreeFromTalError::Replay("multi-TAL root discovery returned no roots".to_string()))?;
let discoveries = roots.iter().map(|item| item.discovery.clone()).collect::<Vec<_>>();
let root_handles = roots
.into_iter()
.map(|item| item.root_handle)
.collect::<Vec<_>>();
let download_log = DownloadLogHandle::new();
let runtime = build_phase1_repo_sync_runtime(
Arc::clone(&store),
policy,
http_fetcher,
rsync_fetcher,
parallel_config,
None,
Some(download_log.clone()),
tal_inputs,
)?;
let runner = make_live_runner(
store.as_ref(),
policy,
http_fetcher,
rsync_fetcher,
validation_time,
None,
Some(download_log.clone()),
Some(runtime),
);
let TreeRunAuditOutput {
tree,
publication_points,
} = run_tree_serial_audit_multi_root(root_handles, &runner, config)?;
let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput {
discovery: primary.discovery.clone(),
discoveries,
tree, tree,
publication_points, publication_points,
downloads, downloads,
@ -532,7 +669,8 @@ pub fn run_tree_from_tal_bytes_serial_audit(
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads); let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput { Ok(RunTreeFromTalAuditOutput {
discovery, discovery: discovery.clone(),
discoveries: vec![discovery],
tree, tree,
publication_points, publication_points,
downloads, downloads,
@ -595,7 +733,8 @@ pub fn run_tree_from_tal_bytes_serial_audit_with_timing(
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads); let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput { Ok(RunTreeFromTalAuditOutput {
discovery, discovery: discovery.clone(),
discoveries: vec![discovery],
tree, tree,
publication_points, publication_points,
downloads, downloads,
@ -649,7 +788,8 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit(
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads); let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput { Ok(RunTreeFromTalAuditOutput {
discovery, discovery: discovery.clone(),
discoveries: vec![discovery],
tree, tree,
publication_points, publication_points,
downloads, downloads,
@ -707,7 +847,8 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing(
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads); let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput { Ok(RunTreeFromTalAuditOutput {
discovery, discovery: discovery.clone(),
discoveries: vec![discovery],
tree, tree,
publication_points, publication_points,
downloads, downloads,
@ -823,7 +964,8 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads); let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput { Ok(RunTreeFromTalAuditOutput {
discovery, discovery: discovery.clone(),
discoveries: vec![discovery],
tree, tree,
publication_points, publication_points,
downloads, downloads,
@ -891,7 +1033,8 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing(
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads); let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput { Ok(RunTreeFromTalAuditOutput {
discovery, discovery: discovery.clone(),
discoveries: vec![discovery],
tree, tree,
publication_points, publication_points,
downloads, downloads,
@ -1089,7 +1232,8 @@ fn run_payload_delta_replay_audit_inner(
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads); let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput { Ok(RunTreeFromTalAuditOutput {
discovery, discovery: discovery.clone(),
discoveries: vec![discovery],
tree, tree,
publication_points, publication_points,
downloads, downloads,
@ -1231,7 +1375,8 @@ fn run_payload_delta_replay_step_audit_inner(
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads); let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput { Ok(RunTreeFromTalAuditOutput {
discovery, discovery: discovery.clone(),
discoveries: vec![discovery],
tree, tree,
publication_points, publication_points,
downloads, downloads,
@ -1266,6 +1411,75 @@ pub fn run_tree_from_tal_and_ta_der_payload_delta_replay_step_serial_audit(
) )
} }
#[cfg(test)]
mod multi_tal_tests {
use super::*;
struct RejectingHttpFetcher;
impl Fetcher for RejectingHttpFetcher {
fn fetch(&self, uri: &str) -> Result<Vec<u8>, String> {
Err(format!("unexpected http fetch: {uri}"))
}
}
struct RejectingRsyncFetcher;
impl crate::fetch::rsync::RsyncFetcher for RejectingRsyncFetcher {
fn fetch_objects(
&self,
_rsync_base_uri: &str,
) -> crate::fetch::rsync::RsyncFetchResult<Vec<(String, Vec<u8>)>> {
Err(crate::fetch::rsync::RsyncFetchError::Fetch(
"unexpected rsync fetch".to_string(),
))
}
fn dedup_key(&self, base_uri: &str) -> String {
base_uri.to_string()
}
}
#[test]
fn discover_multiple_roots_from_tal_inputs_builds_multiple_root_handles() {
let apnic_tal = std::fs::read("tests/fixtures/tal/apnic-rfc7730-https.tal")
.expect("read apnic tal");
let apnic_ta = std::fs::read("tests/fixtures/ta/apnic-ta.cer").expect("read apnic ta");
let arin_tal = std::fs::read("tests/fixtures/tal/arin.tal").expect("read arin tal");
let arin_ta = std::fs::read("tests/fixtures/ta/arin-ta.cer").expect("read arin ta");
let tal_inputs = vec![
TalInputSpec::from_ta_der(
"https://example.test/apnic.tal",
apnic_tal,
apnic_ta,
),
TalInputSpec::from_ta_der(
"https://example.test/arin.tal",
arin_tal,
arin_ta,
),
];
let roots = discover_multiple_roots_from_tal_inputs(
&tal_inputs,
&RejectingHttpFetcher,
&RejectingRsyncFetcher,
)
.expect("discover roots");
assert_eq!(roots.len(), 2);
assert_eq!(roots[0].tal_input.tal_id, "apnic");
assert_eq!(roots[1].tal_input.tal_id, "arin");
assert_eq!(roots[0].root_handle.tal_id, "apnic");
assert_eq!(roots[1].root_handle.tal_id, "arin");
assert_ne!(
roots[0].root_handle.manifest_rsync_uri,
roots[1].root_handle.manifest_rsync_uri
);
}
}
#[cfg(test)] #[cfg(test)]
mod replay_api_tests { mod replay_api_tests {
use super::*; use super::*;

View File

@ -122,6 +122,14 @@ pub fn run_tree_serial_audit(
root: CaInstanceHandle, root: CaInstanceHandle,
runner: &dyn PublicationPointRunner, runner: &dyn PublicationPointRunner,
config: &TreeRunConfig, config: &TreeRunConfig,
) -> Result<TreeRunAuditOutput, TreeRunError> {
run_tree_serial_audit_multi_root(vec![root], runner, config)
}
pub fn run_tree_serial_audit_multi_root(
roots: Vec<CaInstanceHandle>,
runner: &dyn PublicationPointRunner,
config: &TreeRunConfig,
) -> Result<TreeRunAuditOutput, TreeRunError> { ) -> Result<TreeRunAuditOutput, TreeRunError> {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct QueuedCaInstance { struct QueuedCaInstance {
@ -133,13 +141,15 @@ pub fn run_tree_serial_audit(
let mut next_id: u64 = 0; let mut next_id: u64 = 0;
let mut queue: std::collections::VecDeque<QueuedCaInstance> = std::collections::VecDeque::new(); let mut queue: std::collections::VecDeque<QueuedCaInstance> = std::collections::VecDeque::new();
queue.push_back(QueuedCaInstance { for root in roots {
id: next_id, queue.push_back(QueuedCaInstance {
handle: root, id: next_id,
parent_id: None, handle: root,
discovered_from: None, parent_id: None,
}); discovered_from: None,
next_id += 1; });
next_id += 1;
}
let mut visited_manifest_uris: std::collections::HashSet<String> = let mut visited_manifest_uris: std::collections::HashSet<String> =
std::collections::HashSet::new(); std::collections::HashSet::new();
@ -245,6 +255,7 @@ mod tests {
use super::{ use super::{
CaInstanceHandle, DiscoveredChildCaInstance, PublicationPointRunResult, CaInstanceHandle, DiscoveredChildCaInstance, PublicationPointRunResult,
PublicationPointRunner, TreeRunConfig, run_tree_serial_audit, PublicationPointRunner, TreeRunConfig, run_tree_serial_audit,
run_tree_serial_audit_multi_root,
}; };
fn sample_handle(manifest: &str) -> CaInstanceHandle { fn sample_handle(manifest: &str) -> CaInstanceHandle {
@ -339,4 +350,23 @@ mod tests {
["rsync://example.test/repo/child.mft"] ["rsync://example.test/repo/child.mft"]
); );
} }
#[test]
fn run_tree_serial_audit_multi_root_processes_all_roots() {
let seen = Arc::new(Mutex::new(Vec::new()));
let runner = PrefetchRecordingRunner {
seen_prefetch_children: Arc::clone(&seen),
};
let out = run_tree_serial_audit_multi_root(
vec![
sample_handle("rsync://example.test/repo/root-a.mft"),
sample_handle("rsync://example.test/repo/root-b.mft"),
],
&runner,
&TreeRunConfig::default(),
)
.expect("tree run");
assert_eq!(out.tree.instances_processed, 2);
assert_eq!(out.publication_points.len(), 2);
}
} }

View File

@ -0,0 +1,104 @@
use std::collections::BTreeSet;
use std::path::{Path, PathBuf};
use rpki::bundle::{VapCompareRow, VrpCompareRow, decode_ccr_compare_views};
fn fixture(rel: &str) -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(rel)
}
fn run_case(
name: &str,
tal_ta_pairs: &[(&Path, &Path)],
) -> (
serde_json::Value,
BTreeSet<VrpCompareRow>,
BTreeSet<VapCompareRow>,
rpki::ccr::CcrContentInfo,
) {
let db_dir = tempfile::tempdir().expect("db tempdir");
let out_dir = tempfile::tempdir().expect("out tempdir");
let report_path = out_dir.path().join(format!("{name}.report.json"));
let ccr_path = out_dir.path().join(format!("{name}.ccr"));
let mut argv = vec![
"rpki".to_string(),
"--db".to_string(),
db_dir.path().to_string_lossy().to_string(),
"--parallel-phase1".to_string(),
"--disable-rrdp".to_string(),
"--rsync-local-dir".to_string(),
fixture("tests/fixtures/repository").to_string_lossy().to_string(),
"--validation-time".to_string(),
"2026-04-07T00:00:00Z".to_string(),
"--max-depth".to_string(),
"0".to_string(),
"--max-instances".to_string(),
tal_ta_pairs.len().to_string(),
];
for (tal_path, ta_path) in tal_ta_pairs {
argv.push("--tal-path".to_string());
argv.push(tal_path.to_string_lossy().to_string());
argv.push("--ta-path".to_string());
argv.push(ta_path.to_string_lossy().to_string());
}
argv.extend([
"--report-json".to_string(),
report_path.to_string_lossy().to_string(),
"--ccr-out".to_string(),
ccr_path.to_string_lossy().to_string(),
]);
rpki::cli::run(&argv).expect("cli run");
let report: serde_json::Value =
serde_json::from_slice(&std::fs::read(&report_path).expect("read report"))
.expect("parse report");
let ccr = rpki::ccr::decode_content_info(&std::fs::read(&ccr_path).expect("read ccr"))
.expect("decode ccr");
let (vrps, vaps) = decode_ccr_compare_views(&ccr, "multi").expect("compare views");
(report, vrps, vaps, ccr)
}
#[test]
fn multi_tal_parallel_output_matches_union_of_single_tal_outputs() {
let apnic_tal = fixture("tests/fixtures/tal/apnic-rfc7730-https.tal");
let apnic_ta = fixture("tests/fixtures/ta/apnic-ta.cer");
let arin_tal = fixture("tests/fixtures/tal/arin.tal");
let arin_ta = fixture("tests/fixtures/ta/arin-ta.cer");
let (apnic_report, apnic_vrps, apnic_vaps, _) =
run_case("apnic", &[(&apnic_tal, &apnic_ta)]);
let (arin_report, arin_vrps, arin_vaps, _) = run_case("arin", &[(&arin_tal, &arin_ta)]);
let (multi_report, multi_vrps, multi_vaps, multi_ccr) =
run_case("multi", &[(&apnic_tal, &apnic_ta), (&arin_tal, &arin_ta)]);
let expected_vrps = apnic_vrps.union(&arin_vrps).cloned().collect::<BTreeSet<_>>();
let expected_vaps = apnic_vaps.union(&arin_vaps).cloned().collect::<BTreeSet<_>>();
assert_eq!(multi_vrps, expected_vrps);
assert_eq!(multi_vaps, expected_vaps);
let single_point_count = apnic_report["publication_points"]
.as_array()
.expect("apnic points")
.len()
+ arin_report["publication_points"]
.as_array()
.expect("arin points")
.len();
assert_eq!(
multi_report["publication_points"]
.as_array()
.expect("multi points")
.len(),
single_point_count
);
let tas = multi_ccr
.content
.tas
.as_ref()
.expect("multi-TAL CCR must include trust anchors");
assert_eq!(tas.skis.len(), 2);
}