rpki/src/bin/replay_bundle_capture.rs

414 lines
15 KiB
Rust

use rpki::bundle::{
RecordingHttpFetcher, RecordingRsyncFetcher, RirBundleMetadata,
build_single_rir_bundle_manifest, build_vap_compare_rows, build_vrp_compare_rows, sha256_hex,
write_json, write_live_base_replay_bundle_inputs, write_live_bundle_rir_readme,
write_live_bundle_top_readme, write_timing_json, write_vap_csv, write_vrp_csv,
};
use rpki::ccr::{build_ccr_from_run, verify_content_info, write_ccr_file};
use rpki::fetch::http::{BlockingHttpFetcher, HttpFetcherConfig};
use rpki::fetch::rsync_system::{SystemRsyncConfig, SystemRsyncFetcher};
use rpki::policy::Policy;
use rpki::storage::RocksStore;
use rpki::validation::run_tree_from_tal::{
run_tree_from_tal_and_ta_der_payload_replay_serial_audit,
run_tree_from_tal_and_ta_der_serial_audit,
};
use rpki::validation::tree::TreeRunConfig;
use std::fs;
use std::path::PathBuf;
use std::time::Instant;
use time::format_description::well_known::Rfc3339;
#[derive(Debug, Default, PartialEq, Eq)]
struct Args {
rir: Option<String>,
out_dir: Option<PathBuf>,
tal_path: Option<PathBuf>,
ta_path: Option<PathBuf>,
validation_time: Option<time::OffsetDateTime>,
http_timeout_secs: u64,
rsync_timeout_secs: u64,
rsync_mirror_root: Option<PathBuf>,
max_depth: Option<usize>,
max_instances: Option<usize>,
trust_anchor: Option<String>,
}
fn usage() -> &'static str {
"Usage: replay_bundle_capture --rir <name> --out-dir <path> --tal-path <path> --ta-path <path> [--validation-time <rfc3339>] [--http-timeout-secs <n>] [--rsync-timeout-secs <n>] [--rsync-mirror-root <path>] [--max-depth <n>] [--max-instances <n>] [--trust-anchor <name>]"
}
fn parse_args(argv: &[String]) -> Result<Args, String> {
let mut args = Args {
http_timeout_secs: 20,
rsync_timeout_secs: 60,
..Args::default()
};
let mut i = 1usize;
while i < argv.len() {
match argv[i].as_str() {
"--help" | "-h" => return Err(usage().to_string()),
"--rir" => {
i += 1;
args.rir = Some(argv.get(i).ok_or("--rir requires a value")?.clone());
}
"--out-dir" => {
i += 1;
args.out_dir = Some(PathBuf::from(
argv.get(i).ok_or("--out-dir requires a value")?,
));
}
"--tal-path" => {
i += 1;
args.tal_path = Some(PathBuf::from(
argv.get(i).ok_or("--tal-path requires a value")?,
));
}
"--ta-path" => {
i += 1;
args.ta_path = Some(PathBuf::from(
argv.get(i).ok_or("--ta-path requires a value")?,
));
}
"--validation-time" => {
i += 1;
let value = argv.get(i).ok_or("--validation-time requires a value")?;
args.validation_time = Some(
time::OffsetDateTime::parse(value, &Rfc3339)
.map_err(|e| format!("invalid --validation-time: {e}"))?,
);
}
"--http-timeout-secs" => {
i += 1;
args.http_timeout_secs = argv
.get(i)
.ok_or("--http-timeout-secs requires a value")?
.parse()
.map_err(|e| format!("invalid --http-timeout-secs: {e}"))?;
}
"--rsync-timeout-secs" => {
i += 1;
args.rsync_timeout_secs = argv
.get(i)
.ok_or("--rsync-timeout-secs requires a value")?
.parse()
.map_err(|e| format!("invalid --rsync-timeout-secs: {e}"))?;
}
"--rsync-mirror-root" => {
i += 1;
args.rsync_mirror_root = Some(PathBuf::from(
argv.get(i).ok_or("--rsync-mirror-root requires a value")?,
));
}
"--max-depth" => {
i += 1;
args.max_depth = Some(
argv.get(i)
.ok_or("--max-depth requires a value")?
.parse()
.map_err(|e| format!("invalid --max-depth: {e}"))?,
);
}
"--max-instances" => {
i += 1;
args.max_instances = Some(
argv.get(i)
.ok_or("--max-instances requires a value")?
.parse()
.map_err(|e| format!("invalid --max-instances: {e}"))?,
);
}
"--trust-anchor" => {
i += 1;
args.trust_anchor = Some(
argv.get(i)
.ok_or("--trust-anchor requires a value")?
.clone(),
);
}
other => return Err(format!("unknown argument: {other}\n{}", usage())),
}
i += 1;
}
if args.rir.is_none() {
return Err(format!("--rir is required\n{}", usage()));
}
if args.out_dir.is_none() {
return Err(format!("--out-dir is required\n{}", usage()));
}
if args.tal_path.is_none() {
return Err(format!("--tal-path is required\n{}", usage()));
}
if args.ta_path.is_none() {
return Err(format!("--ta-path is required\n{}", usage()));
}
Ok(args)
}
fn run(args: Args) -> Result<PathBuf, String> {
let rir = args.rir.as_ref().unwrap();
let rir_normalized = rir.to_ascii_lowercase();
let trust_anchor = args
.trust_anchor
.clone()
.unwrap_or_else(|| rir_normalized.clone());
let out_root = args.out_dir.as_ref().unwrap();
let rir_dir = out_root.join(&rir_normalized);
fs::create_dir_all(&rir_dir)
.map_err(|e| format!("create rir dir failed: {}: {e}", rir_dir.display()))?;
let tal_bytes =
fs::read(args.tal_path.as_ref().unwrap()).map_err(|e| format!("read tal failed: {e}"))?;
let ta_bytes =
fs::read(args.ta_path.as_ref().unwrap()).map_err(|e| format!("read ta failed: {e}"))?;
let validation_time = args
.validation_time
.unwrap_or_else(time::OffsetDateTime::now_utc);
let db_dir = out_root.join(".tmp").join(format!("{rir}-live-base-db"));
let replay_db_dir = out_root.join(".tmp").join(format!("{rir}-self-replay-db"));
let _ = fs::remove_dir_all(&db_dir);
let _ = fs::remove_dir_all(&replay_db_dir);
if let Some(parent) = db_dir.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create tmp dir failed: {}: {e}", parent.display()))?;
}
let store = RocksStore::open(&db_dir).map_err(|e| format!("open rocksdb failed: {e}"))?;
let http = RecordingHttpFetcher::new(
BlockingHttpFetcher::new(HttpFetcherConfig {
timeout: std::time::Duration::from_secs(args.http_timeout_secs),
..HttpFetcherConfig::default()
})
.map_err(|e| format!("create http fetcher failed: {e}"))?,
);
let rsync = RecordingRsyncFetcher::new(SystemRsyncFetcher::new(SystemRsyncConfig {
timeout: std::time::Duration::from_secs(args.rsync_timeout_secs),
mirror_root: args.rsync_mirror_root.clone(),
..SystemRsyncConfig::default()
}));
let started = Instant::now();
let out = run_tree_from_tal_and_ta_der_serial_audit(
&store,
&Policy::default(),
&tal_bytes,
&ta_bytes,
None,
&http,
&rsync,
validation_time,
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
},
)
.map_err(|e| format!("live base run failed: {e}"))?;
let duration = started.elapsed();
let ccr = build_ccr_from_run(
&store,
&[out.discovery.trust_anchor.clone()],
&out.tree.vrps,
&out.tree.aspas,
&out.tree.router_keys,
validation_time,
)
.map_err(|e| format!("build ccr failed: {e}"))?;
let base_ccr_path = rir_dir.join("base.ccr");
write_ccr_file(&base_ccr_path, &ccr).map_err(|e| format!("write ccr failed: {e}"))?;
let ccr_bytes = fs::read(&base_ccr_path)
.map_err(|e| format!("read written ccr failed: {}: {e}", base_ccr_path.display()))?;
let decoded = rpki::ccr::decode_content_info(&ccr_bytes)
.map_err(|e| format!("decode written ccr failed: {e}"))?;
let verify = verify_content_info(&decoded).map_err(|e| format!("verify ccr failed: {e}"))?;
let vrp_rows = build_vrp_compare_rows(&out.tree.vrps, &trust_anchor);
let vap_rows = build_vap_compare_rows(&out.tree.aspas, &trust_anchor);
let (ccr_vrps, ccr_vaps) = rpki::bundle::decode_ccr_compare_views(&decoded, &trust_anchor)?;
if vrp_rows != ccr_vrps {
return Err("base-vrps compare view does not match base.ccr".to_string());
}
if vap_rows != ccr_vaps {
return Err("base-vaps compare view does not match base.ccr".to_string());
}
write_vrp_csv(&rir_dir.join("base-vrps.csv"), &vrp_rows)?;
write_vap_csv(&rir_dir.join("base-vaps.csv"), &vap_rows)?;
fs::write(rir_dir.join("tal.tal"), &tal_bytes).map_err(|e| format!("write tal failed: {e}"))?;
fs::write(rir_dir.join("ta.cer"), &ta_bytes).map_err(|e| format!("write ta failed: {e}"))?;
let capture = write_live_base_replay_bundle_inputs(
&rir_dir,
&rir_normalized,
validation_time,
&out.publication_points,
&store,
&http.snapshot_responses(),
&rsync.snapshot_fetches(),
)?;
let replay_store = RocksStore::open(&replay_db_dir)
.map_err(|e| format!("open self replay rocksdb failed: {e}"))?;
let replay_out = run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
&replay_store,
&Policy::default(),
&tal_bytes,
&ta_bytes,
None,
&rir_dir.join("base-payload-archive"),
&rir_dir.join("base-locks.json"),
validation_time,
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
},
)
.map_err(|e| format!("self replay failed: {e}"))?;
let replay_vrps = build_vrp_compare_rows(&replay_out.tree.vrps, &trust_anchor);
let replay_vaps = build_vap_compare_rows(&replay_out.tree.aspas, &trust_anchor);
if replay_vrps != vrp_rows {
return Err("self replay VRP compare view mismatch".to_string());
}
if replay_vaps != vap_rows {
return Err("self replay VAP compare view mismatch".to_string());
}
fs::create_dir_all(rir_dir.join("timings"))
.map_err(|e| format!("create timings dir failed: {e}"))?;
write_timing_json(
&rir_dir.join("timings").join("base-produce.json"),
"base",
&validation_time,
duration,
)?;
let metadata = RirBundleMetadata {
schema_version: "20260330-v1".to_string(),
bundle_producer: "ours".to_string(),
rir: rir_normalized.clone(),
base_validation_time: validation_time
.format(&Rfc3339)
.map_err(|e| format!("format validation time failed: {e}"))?,
delta_validation_time: None,
tal_sha256: sha256_hex(&tal_bytes),
ta_cert_sha256: sha256_hex(&ta_bytes),
base_ccr_sha256: sha256_hex(&ccr_bytes),
delta_ccr_sha256: None,
has_aspa: !vap_rows.is_empty(),
has_router_key: verify.router_key_count > 0,
base_vrp_count: vrp_rows.len(),
base_vap_count: vap_rows.len(),
delta_vrp_count: None,
delta_vap_count: None,
};
write_json(&rir_dir.join("bundle.json"), &metadata)?;
write_json(
&rir_dir.join("verification.json"),
&serde_json::json!({
"base": {
"validationTime": metadata.base_validation_time,
"ccr": {
"path": "base.ccr",
"sha256": metadata.base_ccr_sha256,
"stateHashesOk": verify.state_hashes_ok,
"manifestInstances": verify.manifest_instances,
"roaVrpCount": verify.roa_vrp_count,
"aspaPayloadSets": verify.aspa_payload_sets,
"routerKeyCount": verify.router_key_count,
},
"compareViews": {
"vrpsSelfMatch": true,
"vapsSelfMatch": true,
"baseVrpCount": metadata.base_vrp_count,
"baseVapCount": metadata.base_vap_count,
},
"capture": {
"captureId": capture.capture_id,
"rrdpRepoCount": capture.rrdp_repo_count,
"rsyncModuleCount": capture.rsync_module_count,
"selfReplayOk": true,
}
}
}),
)?;
write_live_bundle_top_readme(&out_root.join("README.md"), &rir_normalized)?;
write_live_bundle_rir_readme(
&rir_dir.join("README.md"),
&rir_normalized,
&metadata.base_validation_time,
)?;
write_json(
&out_root.join("bundle-manifest.json"),
&build_single_rir_bundle_manifest(
"20260330-v1",
"ours",
&rir_normalized,
&validation_time,
None,
metadata.has_aspa,
)?,
)?;
let _ = fs::remove_dir_all(&db_dir);
let _ = fs::remove_dir_all(&replay_db_dir);
Ok(out_root.clone())
}
fn main() -> Result<(), String> {
let args = parse_args(&std::env::args().collect::<Vec<_>>())?;
let out = run(args)?;
println!("{}", out.display());
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn parse_args_requires_required_flags() {
let argv = vec![
"replay_bundle_capture".to_string(),
"--rir".to_string(),
"apnic".to_string(),
"--out-dir".to_string(),
"out".to_string(),
"--tal-path".to_string(),
"tal".to_string(),
"--ta-path".to_string(),
"ta".to_string(),
];
let args = parse_args(&argv).expect("parse");
assert_eq!(args.rir.as_deref(), Some("apnic"));
assert_eq!(args.out_dir.as_deref(), Some(std::path::Path::new("out")));
assert_eq!(args.http_timeout_secs, 20);
assert_eq!(args.rsync_timeout_secs, 60);
}
#[test]
fn parse_args_rejects_missing_requireds() {
let err = parse_args(&["replay_bundle_capture".to_string()]).unwrap_err();
assert!(err.contains("--rir is required"), "{err}");
}
#[test]
fn write_timing_json_writes_duration_and_mode() {
let td = tempdir().expect("tempdir");
let path = td.path().join("timings/base-produce.json");
write_timing_json(
&path,
"base",
&time::OffsetDateTime::parse("2026-03-30T00:00:00Z", &Rfc3339).expect("time"),
std::time::Duration::from_millis(1500),
)
.expect("write timing");
let json: serde_json::Value =
serde_json::from_slice(&std::fs::read(&path).expect("read timing")).expect("parse");
assert_eq!(json["mode"], "base");
assert_eq!(json["durationSeconds"], 1.5);
}
}