rpki/src/cli.rs

2728 lines
97 KiB
Rust

use crate::ccr::{
CcrAccumulator, CcrBuildBreakdown, build_ccr_from_run_with_breakdown, write_ccr_file,
};
use crate::cir::{CirTalBinding, export_cir_from_run_multi};
use std::io::BufWriter;
use std::path::{Path, PathBuf};
use crate::analysis::timing::{TimingHandle, TimingMeta, TimingMetaUpdate};
use crate::audit::{
AspaOutput, AuditRepoSyncStats, AuditReportV2, AuditRunMeta, AuditWarning, TreeSummary,
VrpOutput, format_roa_ip_prefix,
};
use crate::fetch::http::{BlockingHttpFetcher, HttpFetcherConfig};
use crate::fetch::rsync::LocalDirRsyncFetcher;
use crate::fetch::rsync_system::{SystemRsyncConfig, SystemRsyncFetcher};
use crate::parallel::config::{ParallelPhase1Config, ParallelPhase2Config};
use crate::parallel::types::TalInputSpec;
use crate::policy::Policy;
use crate::storage::RocksStore;
use crate::validation::run_tree_from_tal::{
RunTreeFromTalAuditOutput, run_tree_from_multiple_tals_parallel_phase2_audit,
run_tree_from_tal_and_ta_der_parallel_phase2_audit,
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit,
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit_with_timing,
run_tree_from_tal_and_ta_der_payload_replay_serial_audit,
run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing,
run_tree_from_tal_url_parallel_phase2_audit,
};
use crate::validation::tree::TreeRunConfig;
use serde::Serialize;
use std::sync::Arc;
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
struct RunStageTiming {
validation_ms: u64,
report_build_ms: u64,
report_write_ms: Option<u64>,
ccr_build_ms: Option<u64>,
ccr_build_breakdown: Option<CcrBuildBreakdown>,
ccr_write_ms: Option<u64>,
cir_build_cir_ms: Option<u64>,
cir_write_cir_ms: Option<u64>,
cir_total_ms: Option<u64>,
total_ms: u64,
publication_points: usize,
repo_sync_ms_total: u64,
publication_point_repo_sync_ms_total: u64,
download_event_count: u64,
rrdp_download_ms_total: u64,
rsync_download_ms_total: u64,
download_bytes_total: u64,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CliArgs {
pub tal_urls: Vec<String>,
pub tal_paths: Vec<PathBuf>,
pub ta_paths: Vec<PathBuf>,
pub tal_url: Option<String>,
pub tal_path: Option<PathBuf>,
pub ta_path: Option<PathBuf>,
pub parallel_phase1_config: ParallelPhase1Config,
pub parallel_phase2_config: ParallelPhase2Config,
pub tal_inputs: Vec<TalInputSpec>,
pub db_path: PathBuf,
pub raw_store_db: Option<PathBuf>,
pub repo_bytes_db: Option<PathBuf>,
pub policy_path: Option<PathBuf>,
pub report_json_path: Option<PathBuf>,
pub report_json_compact: bool,
pub ccr_out_path: Option<PathBuf>,
pub cir_enabled: bool,
pub cir_out_path: Option<PathBuf>,
pub cir_static_root: Option<PathBuf>,
pub cir_tal_uris: Vec<String>,
pub cir_tal_uri: Option<String>,
pub payload_replay_archive: Option<PathBuf>,
pub payload_replay_locks: Option<PathBuf>,
pub payload_base_archive: Option<PathBuf>,
pub payload_base_locks: Option<PathBuf>,
pub payload_base_validation_time: Option<time::OffsetDateTime>,
pub payload_delta_archive: Option<PathBuf>,
pub payload_delta_locks: Option<PathBuf>,
pub rsync_local_dir: Option<PathBuf>,
pub disable_rrdp: bool,
pub rsync_command: Option<PathBuf>,
pub http_timeout_secs: u64,
pub rsync_timeout_secs: u64,
pub rsync_mirror_root: Option<PathBuf>,
pub max_depth: Option<usize>,
pub max_instances: Option<usize>,
pub validation_time: Option<time::OffsetDateTime>,
pub analyze: bool,
pub profile_cpu: bool,
}
fn usage() -> String {
let bin = "rpki";
format!(
"\
Usage:
{bin} --db <path> --tal-url <url> [--tal-url <url> ...] [options]
{bin} --db <path> --tal-path <path> --ta-path <path> [--tal-path <path> --ta-path <path> ...] [options]
Options:
--db <path> RocksDB directory path (required)
--raw-store-db <path> External raw-by-hash store DB path (optional)
--repo-bytes-db <path> External repo object bytes DB path (optional)
--policy <path> Policy TOML path (optional)
--report-json <path> Write full audit report as JSON (optional)
--report-json-compact Write report JSON without pretty-printing (requires --report-json)
--ccr-out <path> Write CCR DER ContentInfo to this path (optional)
--cir-enable Export CIR after the run completes
--cir-out <path> Write CIR DER to this path (requires --cir-enable)
--cir-static-root <path> Deprecated; CIR export no longer exports object pools
--cir-tal-uri <url> Override TAL URI for CIR export (repeatable in multi-TAL mode)
--payload-replay-archive <path> Use local payload replay archive root (offline replay mode)
--payload-replay-locks <path> Use local payload replay locks.json (offline replay mode)
--payload-base-archive <path> Use local base payload archive root (offline delta replay)
--payload-base-locks <path> Use local base locks.json (offline delta replay)
--payload-base-validation-time <rfc3339> Validation time for the base bootstrap inside offline delta replay
--payload-delta-archive <path> Use local delta payload archive root (offline delta replay)
--payload-delta-locks <path> Use local locks-delta.json (offline delta replay)
--tal-url <url> TAL URL (repeatable; URL mode)
--tal-path <path> TAL file path (repeatable; file mode)
--ta-path <path> TA certificate DER file path (repeatable in file mode; pairs with --tal-path by position)
--parallel-max-repo-sync-workers-global <n>
Phase 1 global repo sync worker budget (default: 4)
--parallel-max-inflight-snapshot-bytes-global <n>
Phase 1 inflight snapshot byte budget (default: 512MiB)
--parallel-max-pending-repo-results <n>
Phase 1 pending repo result budget (default: 1024)
--parallel-phase2-object-workers <n>
Phase 2 object worker count (default: 8)
--parallel-phase2-worker-queue-capacity <n>
Phase 2 per-worker object queue capacity (default: 256)
--rsync-local-dir <path> Use LocalDirRsyncFetcher rooted at this directory (offline tests)
--disable-rrdp Disable RRDP and synchronize only via rsync
--rsync-command <path> Use this rsync command instead of the default rsync binary
--http-timeout-secs <n> HTTP fetch timeout seconds (default: 20)
--rsync-timeout-secs <n> rsync I/O timeout seconds (default: 60)
--rsync-mirror-root <path> Persist rsync mirrors under this directory (default: disabled)
--max-depth <n> Max CA instance depth (0 = root only)
--max-instances <n> Max number of CA instances to process
--validation-time <rfc3339> Validation time in RFC3339 (default: now UTC)
--analyze Write timing analysis JSON under target/live/analyze/<timestamp>/
--profile-cpu (Requires build feature 'profile') Write CPU flamegraph under analyze dir
--help Show this help
"
)
}
pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
let mut tal_urls: Vec<String> = Vec::new();
let mut tal_paths: Vec<PathBuf> = Vec::new();
let mut ta_paths: Vec<PathBuf> = Vec::new();
let mut parallel_phase1_cfg = ParallelPhase1Config::default();
let mut parallel_phase2_cfg = ParallelPhase2Config::default();
let mut db_path: Option<PathBuf> = None;
let mut raw_store_db: Option<PathBuf> = None;
let mut repo_bytes_db: Option<PathBuf> = None;
let mut policy_path: Option<PathBuf> = None;
let mut report_json_path: Option<PathBuf> = None;
let mut report_json_compact: bool = false;
let mut ccr_out_path: Option<PathBuf> = None;
let mut cir_enabled: bool = false;
let mut cir_out_path: Option<PathBuf> = None;
let mut cir_static_root: Option<PathBuf> = None;
let mut cir_tal_uris: Vec<String> = Vec::new();
let mut cir_tal_uri: Option<String> = None;
let mut payload_replay_archive: Option<PathBuf> = None;
let mut payload_replay_locks: Option<PathBuf> = None;
let mut payload_base_archive: Option<PathBuf> = None;
let mut payload_base_locks: Option<PathBuf> = None;
let mut payload_base_validation_time: Option<time::OffsetDateTime> = None;
let mut payload_delta_archive: Option<PathBuf> = None;
let mut payload_delta_locks: Option<PathBuf> = None;
let mut rsync_local_dir: Option<PathBuf> = None;
let mut disable_rrdp: bool = false;
let mut rsync_command: Option<PathBuf> = None;
let mut http_timeout_secs: u64 = 30;
let mut rsync_timeout_secs: u64 = 30;
let mut rsync_mirror_root: Option<PathBuf> = None;
let mut max_depth: Option<usize> = None;
let mut max_instances: Option<usize> = None;
let mut validation_time: Option<time::OffsetDateTime> = None;
let mut analyze: bool = false;
let mut profile_cpu: bool = false;
let mut i = 1usize;
while i < argv.len() {
let arg = argv[i].as_str();
match arg {
"--help" | "-h" => return Err(usage()),
"--tal-url" => {
i += 1;
let v = argv.get(i).ok_or("--tal-url requires a value")?;
tal_urls.push(v.clone());
}
"--tal-path" => {
i += 1;
let v = argv.get(i).ok_or("--tal-path requires a value")?;
tal_paths.push(PathBuf::from(v));
}
"--ta-path" => {
i += 1;
let v = argv.get(i).ok_or("--ta-path requires a value")?;
ta_paths.push(PathBuf::from(v));
}
"--parallel-max-repo-sync-workers-global" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-max-repo-sync-workers-global requires a value")?;
parallel_phase1_cfg.max_repo_sync_workers_global = v
.parse::<usize>()
.map_err(|_| format!("invalid --parallel-max-repo-sync-workers-global: {v}"))?;
}
"--parallel-max-inflight-snapshot-bytes-global" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-max-inflight-snapshot-bytes-global requires a value")?;
parallel_phase1_cfg.max_inflight_snapshot_bytes_global =
v.parse::<usize>().map_err(|_| {
format!("invalid --parallel-max-inflight-snapshot-bytes-global: {v}")
})?;
}
"--parallel-max-pending-repo-results" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-max-pending-repo-results requires a value")?;
parallel_phase1_cfg.max_pending_repo_results = v
.parse::<usize>()
.map_err(|_| format!("invalid --parallel-max-pending-repo-results: {v}"))?;
}
"--parallel-phase2-object-workers" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-phase2-object-workers requires a value")?;
parallel_phase2_cfg.object_workers = v
.parse::<usize>()
.map_err(|_| format!("invalid --parallel-phase2-object-workers: {v}"))?;
}
"--parallel-phase2-worker-queue-capacity" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-phase2-worker-queue-capacity requires a value")?;
parallel_phase2_cfg.worker_queue_capacity = v
.parse::<usize>()
.map_err(|_| format!("invalid --parallel-phase2-worker-queue-capacity: {v}"))?;
}
"--db" => {
i += 1;
let v = argv.get(i).ok_or("--db requires a value")?;
db_path = Some(PathBuf::from(v));
}
"--raw-store-db" => {
i += 1;
let v = argv.get(i).ok_or("--raw-store-db requires a value")?;
raw_store_db = Some(PathBuf::from(v));
}
"--repo-bytes-db" => {
i += 1;
let v = argv.get(i).ok_or("--repo-bytes-db requires a value")?;
repo_bytes_db = Some(PathBuf::from(v));
}
"--policy" => {
i += 1;
let v = argv.get(i).ok_or("--policy requires a value")?;
policy_path = Some(PathBuf::from(v));
}
"--report-json" => {
i += 1;
let v = argv.get(i).ok_or("--report-json requires a value")?;
report_json_path = Some(PathBuf::from(v));
}
"--report-json-compact" => {
report_json_compact = true;
}
"--ccr-out" => {
i += 1;
let v = argv.get(i).ok_or("--ccr-out requires a value")?;
ccr_out_path = Some(PathBuf::from(v));
}
"--cir-enable" => {
cir_enabled = true;
}
"--cir-out" => {
i += 1;
let v = argv.get(i).ok_or("--cir-out requires a value")?;
cir_out_path = Some(PathBuf::from(v));
}
"--cir-static-root" => {
i += 1;
let v = argv.get(i).ok_or("--cir-static-root requires a value")?;
cir_static_root = Some(PathBuf::from(v));
}
"--cir-tal-uri" => {
i += 1;
let v = argv.get(i).ok_or("--cir-tal-uri requires a value")?;
cir_tal_uris.push(v.clone());
cir_tal_uri = cir_tal_uris.first().cloned();
}
"--payload-replay-archive" => {
i += 1;
let v = argv
.get(i)
.ok_or("--payload-replay-archive requires a value")?;
payload_replay_archive = Some(PathBuf::from(v));
}
"--payload-replay-locks" => {
i += 1;
let v = argv
.get(i)
.ok_or("--payload-replay-locks requires a value")?;
payload_replay_locks = Some(PathBuf::from(v));
}
"--payload-base-archive" => {
i += 1;
let v = argv
.get(i)
.ok_or("--payload-base-archive requires a value")?;
payload_base_archive = Some(PathBuf::from(v));
}
"--payload-base-locks" => {
i += 1;
let v = argv.get(i).ok_or("--payload-base-locks requires a value")?;
payload_base_locks = Some(PathBuf::from(v));
}
"--payload-base-validation-time" => {
i += 1;
let v = argv
.get(i)
.ok_or("--payload-base-validation-time requires a value")?;
use time::format_description::well_known::Rfc3339;
let t = time::OffsetDateTime::parse(v, &Rfc3339).map_err(|e| {
format!("invalid --payload-base-validation-time (RFC3339 expected): {e}")
})?;
payload_base_validation_time = Some(t);
}
"--payload-delta-archive" => {
i += 1;
let v = argv
.get(i)
.ok_or("--payload-delta-archive requires a value")?;
payload_delta_archive = Some(PathBuf::from(v));
}
"--payload-delta-locks" => {
i += 1;
let v = argv
.get(i)
.ok_or("--payload-delta-locks requires a value")?;
payload_delta_locks = Some(PathBuf::from(v));
}
"--rsync-local-dir" => {
i += 1;
let v = argv.get(i).ok_or("--rsync-local-dir requires a value")?;
rsync_local_dir = Some(PathBuf::from(v));
}
"--disable-rrdp" => {
disable_rrdp = true;
}
"--rsync-command" => {
i += 1;
let v = argv.get(i).ok_or("--rsync-command requires a value")?;
rsync_command = Some(PathBuf::from(v));
}
"--http-timeout-secs" => {
i += 1;
let v = argv.get(i).ok_or("--http-timeout-secs requires a value")?;
http_timeout_secs = v
.parse::<u64>()
.map_err(|_| format!("invalid --http-timeout-secs: {v}"))?;
}
"--rsync-timeout-secs" => {
i += 1;
let v = argv.get(i).ok_or("--rsync-timeout-secs requires a value")?;
rsync_timeout_secs = v
.parse::<u64>()
.map_err(|_| format!("invalid --rsync-timeout-secs: {v}"))?;
}
"--rsync-mirror-root" => {
i += 1;
let v = argv.get(i).ok_or("--rsync-mirror-root requires a value")?;
rsync_mirror_root = Some(PathBuf::from(v));
}
"--max-depth" => {
i += 1;
let v = argv.get(i).ok_or("--max-depth requires a value")?;
max_depth = Some(
v.parse::<usize>()
.map_err(|_| format!("invalid --max-depth: {v}"))?,
);
}
"--max-instances" => {
i += 1;
let v = argv.get(i).ok_or("--max-instances requires a value")?;
max_instances = Some(
v.parse::<usize>()
.map_err(|_| format!("invalid --max-instances: {v}"))?,
);
}
"--validation-time" => {
i += 1;
let v = argv.get(i).ok_or("--validation-time requires a value")?;
use time::format_description::well_known::Rfc3339;
let t = time::OffsetDateTime::parse(v, &Rfc3339)
.map_err(|e| format!("invalid --validation-time (RFC3339 expected): {e}"))?;
validation_time = Some(t);
}
"--analyze" => {
analyze = true;
}
"--profile-cpu" => {
profile_cpu = true;
}
_ => return Err(format!("unknown argument: {arg}\n\n{}", usage())),
}
i += 1;
}
let db_path = db_path.ok_or_else(|| format!("--db is required\n\n{}", usage()))?;
let tal_mode_count = (!tal_urls.is_empty()) as u8 + (!tal_paths.is_empty()) as u8;
if tal_mode_count != 1 {
return Err(format!(
"must specify either one-or-more --tal-url or one-or-more --tal-path/--ta-path pairs\n\n{}",
usage()
));
}
if parallel_phase2_cfg.object_workers == 0 {
return Err(format!(
"--parallel-phase2-object-workers must be > 0\n\n{}",
usage()
));
}
if parallel_phase2_cfg.worker_queue_capacity == 0 {
return Err(format!(
"--parallel-phase2-worker-queue-capacity must be > 0\n\n{}",
usage()
));
}
if !tal_urls.is_empty() && !ta_paths.is_empty() {
return Err(format!(
"--ta-path cannot be used with --tal-url mode\n\n{}",
usage()
));
}
if !tal_paths.is_empty() {
if !ta_paths.is_empty() {
if ta_paths.len() != tal_paths.len() {
return Err(format!(
"--tal-path and --ta-path counts must match in file mode\n\n{}",
usage()
));
}
} else if ta_paths.is_empty() && !disable_rrdp {
return Err(format!(
"--tal-path requires --ta-path unless --disable-rrdp is set\n\n{}",
usage()
));
}
}
let tal_url = tal_urls.first().cloned();
let tal_path = tal_paths.first().cloned();
let ta_path = ta_paths.first().cloned();
if cir_enabled && cir_out_path.is_none() {
return Err(format!("--cir-enable requires --cir-out\n\n{}", usage()));
}
if report_json_compact && report_json_path.is_none() {
return Err(format!(
"--report-json-compact requires --report-json\n\n{}",
usage()
));
}
if cir_static_root.is_some() {
return Err(format!(
"--cir-static-root is no longer supported; CIR export now writes only .cir files\n\n{}",
usage()
));
}
if !cir_enabled && (cir_out_path.is_some() || !cir_tal_uris.is_empty()) {
return Err(format!(
"--cir-out/--cir-tal-uri require --cir-enable\n\n{}",
usage()
));
}
if cir_enabled && !cir_tal_uris.is_empty() {
let expected = if !tal_paths.is_empty() {
tal_paths.len()
} else {
tal_urls.len()
};
if cir_tal_uris.len() != expected {
return Err(format!(
"--cir-tal-uri count must match TAL input count when provided\n\n{}",
usage()
));
}
}
if cir_enabled && !tal_paths.is_empty() && cir_tal_uris.is_empty() {
return Err(format!(
"CIR export in --tal-path mode requires --cir-tal-uri for each TAL\n\n{}",
usage()
));
}
let replay_mode_count =
payload_replay_archive.is_some() as u8 + payload_replay_locks.is_some() as u8;
if replay_mode_count == 1 {
return Err(format!(
"--payload-replay-archive and --payload-replay-locks must be provided together
{}",
usage()
));
}
let delta_mode_count = payload_base_archive.is_some() as u8
+ payload_base_locks.is_some() as u8
+ payload_delta_archive.is_some() as u8
+ payload_delta_locks.is_some() as u8;
if delta_mode_count > 0 && delta_mode_count < 4 {
return Err(format!(
"--payload-base-archive, --payload-base-locks, --payload-delta-archive and --payload-delta-locks must be provided together
{}",
usage()
));
}
if replay_mode_count == 2 && delta_mode_count == 4 {
return Err(format!(
"snapshot replay mode and delta replay mode are mutually exclusive
{}",
usage()
));
}
if replay_mode_count == 2 {
if tal_url.is_some() {
return Err(format!(
"payload replay mode requires --tal-path and --ta-path; --tal-url is not supported
{}",
usage()
));
}
if tal_path.is_none() || ta_path.is_none() {
return Err(format!(
"payload replay mode requires --tal-path and --ta-path
{}",
usage()
));
}
if rsync_local_dir.is_some() {
return Err(format!(
"payload replay mode cannot be combined with --rsync-local-dir
{}",
usage()
));
}
}
if delta_mode_count == 4 {
if tal_url.is_some() {
return Err(format!(
"payload delta replay mode requires --tal-path and --ta-path; --tal-url is not supported
{}",
usage()
));
}
if tal_path.is_none() || ta_path.is_none() {
return Err(format!(
"payload delta replay mode requires --tal-path and --ta-path
{}",
usage()
));
}
if rsync_local_dir.is_some() {
return Err(format!(
"payload delta replay mode cannot be combined with --rsync-local-dir
{}",
usage()
));
}
}
let mut tal_inputs = Vec::new();
if !tal_urls.is_empty() {
tal_inputs.extend(tal_urls.iter().cloned().map(TalInputSpec::from_url));
} else if !tal_paths.is_empty() {
if ta_paths.len() == tal_paths.len() {
tal_inputs.extend(tal_paths.iter().cloned().zip(ta_paths.iter().cloned()).map(
|(tal_path, ta_path)| TalInputSpec::from_file_path_with_ta(tal_path, ta_path),
));
} else {
tal_inputs.extend(tal_paths.iter().cloned().map(TalInputSpec::from_file_path));
}
}
Ok(CliArgs {
tal_urls,
tal_paths,
ta_paths,
tal_url,
tal_path,
ta_path,
parallel_phase1_config: parallel_phase1_cfg,
parallel_phase2_config: parallel_phase2_cfg,
tal_inputs,
db_path,
raw_store_db,
repo_bytes_db,
policy_path,
report_json_path,
report_json_compact,
ccr_out_path,
cir_enabled,
cir_out_path,
cir_static_root,
cir_tal_uris,
cir_tal_uri,
payload_replay_archive,
payload_replay_locks,
payload_base_archive,
payload_base_locks,
payload_base_validation_time,
payload_delta_archive,
payload_delta_locks,
rsync_local_dir,
disable_rrdp,
rsync_command,
http_timeout_secs,
rsync_timeout_secs,
rsync_mirror_root,
max_depth,
max_instances,
validation_time,
analyze,
profile_cpu,
})
}
fn read_policy(path: Option<&Path>) -> Result<Policy, String> {
match path {
None => Ok(Policy::default()),
Some(p) => {
let s = std::fs::read_to_string(p)
.map_err(|e| format!("read policy file failed: {}: {e}", p.display()))?;
Policy::from_toml_str(&s).map_err(|e| e.to_string())
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum ReportJsonFormat {
Pretty,
Compact,
}
fn write_json(path: &Path, report: &AuditReportV2, format: ReportJsonFormat) -> Result<(), String> {
let f = std::fs::File::create(path)
.map_err(|e| format!("create report file failed: {}: {e}", path.display()))?;
let writer = BufWriter::new(f);
match format {
ReportJsonFormat::Pretty => serde_json::to_writer_pretty(writer, report),
ReportJsonFormat::Compact => serde_json::to_writer(writer, report),
}
.map_err(|e| format!("write report json failed: {e}"))?;
Ok(())
}
fn unique_rrdp_repos(report: &AuditReportV2) -> usize {
use std::collections::HashSet;
let mut set: HashSet<&str> = HashSet::new();
for pp in &report.publication_points {
if let Some(u) = pp.rrdp_notification_uri.as_deref() {
set.insert(u);
}
}
set.len()
}
fn print_summary(report: &AuditReportV2) {
let rrdp_repos = unique_rrdp_repos(report);
println!("RPKI stage2 serial run summary");
println!(
"validation_time={}",
report.meta.validation_time_rfc3339_utc
);
println!(
"publication_points_processed={} publication_points_failed={}",
report.tree.instances_processed, report.tree.instances_failed
);
println!("rrdp_repos_unique={rrdp_repos}");
println!("vrps={}", report.vrps.len());
println!("aspas={}", report.aspas.len());
println!(
"audit_publication_points={}",
report.publication_points.len()
);
println!(
"warnings_total={}",
report.tree.warnings.len()
+ report
.publication_points
.iter()
.map(|pp| pp.warnings.len())
.sum::<usize>()
);
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct PostValidationShared {
discovery: crate::validation::from_tal::DiscoveredRootCaInstance,
discoveries: Arc<[crate::validation::from_tal::DiscoveredRootCaInstance]>,
instances_processed: usize,
instances_failed: usize,
tree_warnings: Arc<[crate::report::Warning]>,
vrps: Arc<[crate::validation::objects::Vrp]>,
aspas: Arc<[crate::validation::objects::AspaAttestation]>,
router_keys: Arc<[crate::validation::objects::RouterKeyPayload]>,
publication_points: Arc<[crate::audit::PublicationPointAudit]>,
downloads: Arc<[crate::audit::AuditDownloadEvent]>,
download_stats: crate::audit::AuditDownloadStats,
current_repo_objects: Arc<[crate::current_repo_index::CurrentRepoObject]>,
ccr_accumulator: Option<CcrAccumulator>,
}
impl PostValidationShared {
fn from_run_output(out: RunTreeFromTalAuditOutput) -> Self {
let RunTreeFromTalAuditOutput {
discovery,
discoveries,
tree,
publication_points,
downloads,
download_stats,
current_repo_objects,
ccr_accumulator,
} = out;
let crate::validation::tree::TreeRunOutput {
instances_processed,
instances_failed,
warnings,
vrps,
aspas,
router_keys,
} = tree;
Self {
discovery,
discoveries: discoveries.into(),
instances_processed,
instances_failed,
tree_warnings: warnings.into(),
vrps: vrps.into(),
aspas: aspas.into(),
router_keys: router_keys.into(),
publication_points: publication_points.into(),
downloads: downloads.into(),
download_stats,
current_repo_objects: current_repo_objects.into(),
ccr_accumulator,
}
}
fn trust_anchors(&self) -> Vec<crate::data_model::ta::TrustAnchor> {
if self.discoveries.is_empty() {
vec![self.discovery.trust_anchor.clone()]
} else {
self.discoveries
.iter()
.map(|item| item.trust_anchor.clone())
.collect()
}
}
fn current_repo_objects(&self) -> Option<&[crate::current_repo_index::CurrentRepoObject]> {
if self.current_repo_objects.is_empty() {
None
} else {
Some(self.current_repo_objects.as_ref())
}
}
}
fn build_report(
policy: &Policy,
validation_time: time::OffsetDateTime,
shared: &PostValidationShared,
) -> AuditReportV2 {
use time::format_description::well_known::Rfc3339;
let validation_time_rfc3339_utc = validation_time
.to_offset(time::UtcOffset::UTC)
.format(&Rfc3339)
.expect("format validation_time");
let vrps = shared
.vrps
.iter()
.map(|v| VrpOutput {
asn: v.asn,
prefix: format_roa_ip_prefix(&v.prefix),
max_length: v.max_length,
})
.collect::<Vec<_>>();
let aspas = shared
.aspas
.iter()
.map(|a| AspaOutput {
customer_as_id: a.customer_as_id,
provider_as_ids: a.provider_as_ids.clone(),
})
.collect::<Vec<_>>();
let repo_sync_stats = build_repo_sync_stats(shared.publication_points.as_ref());
AuditReportV2 {
format_version: 2,
meta: AuditRunMeta {
validation_time_rfc3339_utc,
},
policy: policy.clone(),
tree: TreeSummary {
instances_processed: shared.instances_processed,
instances_failed: shared.instances_failed,
warnings: shared
.tree_warnings
.iter()
.map(AuditWarning::from)
.collect(),
},
publication_points: shared.publication_points.iter().cloned().collect(),
vrps,
aspas,
downloads: shared.downloads.iter().cloned().collect(),
download_stats: shared.download_stats.clone(),
repo_sync_stats,
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct ReportTaskOutput {
report: AuditReportV2,
report_build_ms: u64,
report_write_ms: Option<u64>,
}
fn run_report_task(
policy: &Policy,
validation_time: time::OffsetDateTime,
shared: &PostValidationShared,
report_json_path: Option<&Path>,
report_json_format: ReportJsonFormat,
) -> Result<ReportTaskOutput, String> {
let report_started = std::time::Instant::now();
let report = build_report(policy, validation_time, shared);
let report_build_ms = report_started.elapsed().as_millis() as u64;
let report_write_ms = if let Some(path) = report_json_path {
let started = std::time::Instant::now();
write_json(path, &report, report_json_format)?;
Some(started.elapsed().as_millis() as u64)
} else {
None
};
Ok(ReportTaskOutput {
report,
report_build_ms,
report_write_ms,
})
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct CcrTaskOutput {
ccr_build_ms: Option<u64>,
ccr_build_breakdown: Option<CcrBuildBreakdown>,
ccr_write_ms: Option<u64>,
}
fn run_ccr_task(
store: &RocksStore,
shared: &PostValidationShared,
ccr_out_path: Option<&Path>,
produced_at: time::OffsetDateTime,
) -> Result<CcrTaskOutput, String> {
let mut ccr_build_ms = None;
let mut ccr_build_breakdown = None;
let mut ccr_write_ms = None;
if let Some(path) = ccr_out_path {
let started = std::time::Instant::now();
let (ccr, build_breakdown) = if let Some(accumulator) = shared.ccr_accumulator.as_ref() {
(
accumulator
.finish(
produced_at,
shared.vrps.as_ref(),
shared.aspas.as_ref(),
shared.router_keys.as_ref(),
)
.map_err(|e| e.to_string())?,
None,
)
} else {
let trust_anchors = shared.trust_anchors();
let (ccr, build_breakdown) = build_ccr_from_run_with_breakdown(
store,
&trust_anchors,
shared.vrps.as_ref(),
shared.aspas.as_ref(),
shared.router_keys.as_ref(),
produced_at,
)
.map_err(|e| e.to_string())?;
(ccr, Some(build_breakdown))
};
ccr_build_ms = Some(started.elapsed().as_millis() as u64);
ccr_build_breakdown = build_breakdown;
let started = std::time::Instant::now();
write_ccr_file(path, &ccr).map_err(|e| e.to_string())?;
ccr_write_ms = Some(started.elapsed().as_millis() as u64);
eprintln!("wrote CCR: {}", path.display());
}
Ok(CcrTaskOutput {
ccr_build_ms,
ccr_build_breakdown,
ccr_write_ms,
})
}
fn write_stage_timing(
report_json_path: Option<&Path>,
stage_timing: &RunStageTiming,
) -> Result<(), String> {
if let Some(path) = report_json_path {
if let Some(parent) = path.parent() {
let stage_timing_path = parent.join("stage-timing.json");
std::fs::write(
&stage_timing_path,
serde_json::to_vec_pretty(stage_timing).map_err(|e| e.to_string())?,
)
.map_err(|e| {
format!(
"write stage timing failed: {}: {e}",
stage_timing_path.display()
)
})?;
eprintln!("analysis: wrote {}", stage_timing_path.display());
}
}
Ok(())
}
fn resolve_cir_export_tal_uris(args: &CliArgs) -> Result<Vec<String>, String> {
if !args.cir_tal_uris.is_empty() {
return Ok(args.cir_tal_uris.clone());
}
if !args.tal_urls.is_empty() {
return Ok(args.tal_urls.clone());
}
Err("CIR export requires TAL URI source(s)".to_string())
}
fn build_repo_sync_stats(
publication_points: &[crate::audit::PublicationPointAudit],
) -> AuditRepoSyncStats {
let mut stats = AuditRepoSyncStats {
publication_points_total: publication_points.len() as u64,
..AuditRepoSyncStats::default()
};
for pp in publication_points {
let duration = pp.repo_sync_duration_ms.unwrap_or(0);
if let Some(phase) = pp.repo_sync_phase.as_ref() {
let entry = stats.by_phase.entry(phase.clone()).or_default();
entry.count += 1;
entry.duration_ms_total += duration;
}
let entry = stats
.by_terminal_state
.entry(pp.repo_terminal_state.clone())
.or_default();
entry.count += 1;
entry.duration_ms_total += duration;
}
stats
}
fn run_online_validation_with_fetchers<H, R>(
store: Arc<RocksStore>,
policy: &Policy,
args: &CliArgs,
http: &H,
rsync: &R,
validation_time: time::OffsetDateTime,
config: &TreeRunConfig,
collect_current_repo_objects: bool,
timing: Option<&TimingHandle>,
) -> Result<RunTreeFromTalAuditOutput, String>
where
H: crate::sync::rrdp::Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{
if args.tal_inputs.len() > 1 {
return run_tree_from_multiple_tals_parallel_phase2_audit(
store,
policy,
args.tal_inputs.clone(),
http,
rsync,
validation_time,
config,
args.parallel_phase1_config.clone(),
args.parallel_phase2_config.clone(),
collect_current_repo_objects,
)
.map_err(|e| e.to_string());
}
match (
args.tal_url.as_ref(),
args.tal_path.as_ref(),
args.ta_path.as_ref(),
) {
(Some(url), _, _) => run_tree_from_tal_url_parallel_phase2_audit(
store,
policy,
url,
http,
rsync,
validation_time,
config,
args.parallel_phase1_config.clone(),
args.parallel_phase2_config.clone(),
collect_current_repo_objects,
)
.map_err(|e| e.to_string()),
(None, Some(tal_path), Some(ta_path)) => {
let tal_bytes = std::fs::read(tal_path)
.map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?;
let ta_der = std::fs::read(ta_path)
.map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?;
run_tree_from_tal_and_ta_der_parallel_phase2_audit(
store,
policy,
&tal_bytes,
&ta_der,
None,
http,
rsync,
validation_time,
config,
args.parallel_phase1_config.clone(),
args.parallel_phase2_config.clone(),
collect_current_repo_objects,
)
.map_err(|e| e.to_string())
}
(None, Some(tal_path), None) => {
let tal_bytes = std::fs::read(tal_path)
.map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?;
let tal_uri = args.cir_tal_uri.clone();
if let Some(t) = timing {
crate::validation::run_tree_from_tal::run_tree_from_tal_bytes_serial_audit_with_timing(
store.as_ref(),
policy,
&tal_bytes,
tal_uri,
http,
rsync,
validation_time,
config,
t,
)
.map_err(|e| e.to_string())
} else {
crate::validation::run_tree_from_tal::run_tree_from_tal_bytes_serial_audit(
store.as_ref(),
policy,
&tal_bytes,
tal_uri,
http,
rsync,
validation_time,
config,
)
.map_err(|e| e.to_string())
}
}
_ => unreachable!("validated by parse_args"),
}
}
pub fn run(argv: &[String]) -> Result<(), String> {
let args = parse_args(argv)?;
let mut policy = read_policy(args.policy_path.as_deref())?;
if args.disable_rrdp {
policy.sync_preference = crate::policy::SyncPreference::RsyncOnly;
}
let validation_time = args
.validation_time
.unwrap_or_else(time::OffsetDateTime::now_utc);
let store = if args.raw_store_db.is_some() || args.repo_bytes_db.is_some() {
Arc::new(
RocksStore::open_with_external_stores(
&args.db_path,
args.raw_store_db.as_deref(),
args.repo_bytes_db.as_deref(),
)
.map_err(|e| e.to_string())?,
)
} else {
Arc::new(RocksStore::open(&args.db_path).map_err(|e| e.to_string())?)
};
let config = TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
};
let replay_mode = args.payload_replay_archive.is_some();
let delta_replay_mode = args.payload_base_archive.is_some();
use time::format_description::well_known::Rfc3339;
let mut timing: Option<(std::path::PathBuf, TimingHandle)> = None;
if args.analyze {
let recorded_at_utc_rfc3339 = time::OffsetDateTime::now_utc()
.to_offset(time::UtcOffset::UTC)
.format(&Rfc3339)
.map_err(|e| format!("format recorded_at_utc failed: {e}"))?;
let validation_time_utc_rfc3339 = validation_time
.to_offset(time::UtcOffset::UTC)
.format(&Rfc3339)
.map_err(|e| format!("format validation_time failed: {e}"))?;
let ts_compact = {
let fmt = time::format_description::parse("[year][month][day]T[hour][minute][second]Z")
.map_err(|e| format!("format description parse failed: {e}"))?;
time::OffsetDateTime::now_utc()
.format(&fmt)
.map_err(|e| format!("format timestamp failed: {e}"))?
};
let out_dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("target")
.join("live")
.join("analyze")
.join(ts_compact);
std::fs::create_dir_all(&out_dir)
.map_err(|e| format!("create analyze out dir failed: {}: {e}", out_dir.display()))?;
let handle = TimingHandle::new(TimingMeta {
recorded_at_utc_rfc3339,
validation_time_utc_rfc3339,
tal_url: None,
db_path: None,
});
handle.set_meta(TimingMetaUpdate {
tal_url: args.tal_url.as_deref(),
db_path: Some(args.db_path.to_string_lossy().as_ref()),
});
timing = Some((out_dir, handle));
}
if args.profile_cpu && !args.analyze {
return Err("--profile-cpu requires --analyze".to_string());
}
#[cfg(not(feature = "profile"))]
if args.profile_cpu {
return Err("CPU profiling requires building with: --features profile".to_string());
}
#[cfg(feature = "profile")]
let mut profiler_guard: Option<pprof::ProfilerGuard<'static>> = if args.profile_cpu {
Some(
pprof::ProfilerGuard::new(100)
.map_err(|e| format!("pprof ProfilerGuard init failed: {e}"))?,
)
} else {
None
};
let total_started = std::time::Instant::now();
let validation_started = std::time::Instant::now();
let collect_current_repo_objects = args.cir_enabled;
let out = if delta_replay_mode {
let tal_path = args
.tal_path
.as_ref()
.expect("validated by parse_args for delta replay mode");
let ta_path = args
.ta_path
.as_ref()
.expect("validated by parse_args for delta replay mode");
let base_archive = args
.payload_base_archive
.as_ref()
.expect("validated by parse_args for delta replay mode");
let base_locks = args
.payload_base_locks
.as_ref()
.expect("validated by parse_args for delta replay mode");
let base_validation_time = args.payload_base_validation_time.unwrap_or(validation_time);
let delta_archive = args
.payload_delta_archive
.as_ref()
.expect("validated by parse_args for delta replay mode");
let delta_locks = args
.payload_delta_locks
.as_ref()
.expect("validated by parse_args for delta replay mode");
let tal_bytes = std::fs::read(tal_path)
.map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?;
let ta_der = std::fs::read(ta_path)
.map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?;
if let Some((_, t)) = timing.as_ref() {
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit_with_timing(
store.as_ref(),
&policy,
&tal_bytes,
&ta_der,
None,
base_archive,
base_locks,
delta_archive,
delta_locks,
base_validation_time,
validation_time,
&config,
t,
)
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit(
store.as_ref(),
&policy,
&tal_bytes,
&ta_der,
None,
base_archive,
base_locks,
delta_archive,
delta_locks,
base_validation_time,
validation_time,
&config,
)
.map_err(|e| e.to_string())?
}
} else if replay_mode {
let tal_path = args
.tal_path
.as_ref()
.expect("validated by parse_args for replay mode");
let ta_path = args
.ta_path
.as_ref()
.expect("validated by parse_args for replay mode");
let archive_root = args
.payload_replay_archive
.as_ref()
.expect("validated by parse_args for replay mode");
let locks_path = args
.payload_replay_locks
.as_ref()
.expect("validated by parse_args for replay mode");
let tal_bytes = std::fs::read(tal_path)
.map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?;
let ta_der = std::fs::read(ta_path)
.map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?;
if let Some((_, t)) = timing.as_ref() {
run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing(
store.as_ref(),
&policy,
&tal_bytes,
&ta_der,
None,
archive_root,
locks_path,
validation_time,
&config,
t,
)
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
store.as_ref(),
&policy,
&tal_bytes,
&ta_der,
None,
archive_root,
locks_path,
validation_time,
&config,
)
.map_err(|e| e.to_string())?
}
} else if let Some(dir) = args.rsync_local_dir.as_ref() {
let http = BlockingHttpFetcher::new(HttpFetcherConfig {
timeout: std::time::Duration::from_secs(args.http_timeout_secs.max(1)),
..HttpFetcherConfig::default()
})
.map_err(|e| e.to_string())?;
let rsync = LocalDirRsyncFetcher::new(dir);
run_online_validation_with_fetchers(
Arc::clone(&store),
&policy,
&args,
&http,
&rsync,
validation_time,
&config,
collect_current_repo_objects,
timing.as_ref().map(|(_, t)| t),
)?
} else {
let http = BlockingHttpFetcher::new(HttpFetcherConfig {
timeout: std::time::Duration::from_secs(args.http_timeout_secs.max(1)),
..HttpFetcherConfig::default()
})
.map_err(|e| e.to_string())?;
let rsync = SystemRsyncFetcher::new(SystemRsyncConfig {
rsync_bin: args
.rsync_command
.clone()
.unwrap_or_else(|| PathBuf::from("rsync")),
timeout: std::time::Duration::from_secs(args.rsync_timeout_secs.max(1)),
mirror_root: args.rsync_mirror_root.clone(),
..SystemRsyncConfig::default()
});
run_online_validation_with_fetchers(
Arc::clone(&store),
&policy,
&args,
&http,
&rsync,
validation_time,
&config,
collect_current_repo_objects,
timing.as_ref().map(|(_, t)| t),
)?
};
let validation_ms = validation_started.elapsed().as_millis() as u64;
let shared = PostValidationShared::from_run_output(out);
if let Some((_out_dir, t)) = timing.as_ref() {
t.record_count("instances_processed", shared.instances_processed as u64);
t.record_count("instances_failed", shared.instances_failed as u64);
}
let publication_points = shared.publication_points.len();
let publication_point_repo_sync_ms_total: u64 = shared
.publication_points
.iter()
.map(|pp| pp.repo_sync_duration_ms.unwrap_or(0))
.sum();
let download_event_count = shared.download_stats.events_total;
let rrdp_download_ms_total: u64 = ["rrdp_notification", "rrdp_snapshot", "rrdp_delta"]
.iter()
.map(|key| {
shared
.download_stats
.by_kind
.get(*key)
.map(|item| item.duration_ms_total)
.unwrap_or(0)
})
.sum();
let rsync_download_ms_total = shared
.download_stats
.by_kind
.get("rsync")
.map(|item| item.duration_ms_total)
.unwrap_or(0);
let repo_sync_ms_total = rrdp_download_ms_total + rsync_download_ms_total;
let download_bytes_total: u64 = shared
.download_stats
.by_kind
.values()
.map(|item| item.bytes_total.unwrap_or(0))
.sum();
#[cfg(feature = "profile")]
let profiler_report = if let Some(guard) = profiler_guard.take() {
Some(
guard
.report()
.build()
.map_err(|e| format!("pprof report build failed: {e}"))?,
)
} else {
None
};
let report_json_format = if args.report_json_compact {
ReportJsonFormat::Compact
} else {
ReportJsonFormat::Pretty
};
let ccr_produced_at = time::OffsetDateTime::now_utc();
let (report_result, ccr_result) = std::thread::scope(|scope| {
let report_handle = scope.spawn(|| {
run_report_task(
&policy,
validation_time,
&shared,
args.report_json_path.as_deref(),
report_json_format,
)
});
let ccr_handle = scope.spawn(|| {
run_ccr_task(
store.as_ref(),
&shared,
args.ccr_out_path.as_deref(),
ccr_produced_at,
)
});
let report_result = report_handle
.join()
.map_err(|_| "report task panicked".to_string())
.and_then(|result| result);
let ccr_result = ccr_handle
.join()
.map_err(|_| "ccr task panicked".to_string())
.and_then(|result| result);
(report_result, ccr_result)
});
let report_output = report_result?;
let ccr_output = ccr_result?;
let report = report_output.report;
let report_build_ms = report_output.report_build_ms;
let report_write_ms = report_output.report_write_ms;
let ccr_build_ms = ccr_output.ccr_build_ms;
let ccr_build_breakdown = ccr_output.ccr_build_breakdown;
let ccr_write_ms = ccr_output.ccr_write_ms;
let mut cir_build_cir_ms = None;
let mut cir_write_cir_ms = None;
let mut cir_total_ms = None;
if args.cir_enabled {
let cir_tal_uris = resolve_cir_export_tal_uris(&args)?;
if cir_tal_uris.len() != shared.discoveries.len() {
return Err(format!(
"CIR export TAL URI count ({}) does not match discovery count ({})",
cir_tal_uris.len(),
shared.discoveries.len()
));
}
let cir_out_path = args
.cir_out_path
.as_deref()
.expect("validated by parse_args for cir");
let tal_bindings = shared
.discoveries
.iter()
.zip(cir_tal_uris.iter())
.map(|(discovery, tal_uri)| CirTalBinding {
trust_anchor: &discovery.trust_anchor,
tal_uri: tal_uri.as_str(),
})
.collect::<Vec<_>>();
let summary = export_cir_from_run_multi(
store.as_ref(),
&tal_bindings,
validation_time,
shared.publication_points.as_ref(),
cir_out_path,
time::OffsetDateTime::now_utc().date(),
shared.current_repo_objects(),
)
.map_err(|e| e.to_string())?;
cir_build_cir_ms = Some(summary.timing.build_cir_ms);
cir_write_cir_ms = Some(summary.timing.write_cir_ms);
cir_total_ms = Some(summary.timing.total_ms);
eprintln!(
"wrote CIR: {} (objects={}, tals={}, build_cir_ms={}, write_cir_ms={}, total_ms={})",
cir_out_path.display(),
summary.object_count,
summary.tal_count,
summary.timing.build_cir_ms,
summary.timing.write_cir_ms,
summary.timing.total_ms
);
}
let stage_timing = RunStageTiming {
validation_ms,
report_build_ms,
report_write_ms,
ccr_build_ms,
ccr_build_breakdown,
ccr_write_ms,
cir_build_cir_ms,
cir_write_cir_ms,
cir_total_ms,
total_ms: total_started.elapsed().as_millis() as u64,
publication_points,
repo_sync_ms_total,
publication_point_repo_sync_ms_total,
download_event_count,
rrdp_download_ms_total,
rsync_download_ms_total,
download_bytes_total,
};
write_stage_timing(args.report_json_path.as_deref(), &stage_timing)?;
if let Some((out_dir, t)) = timing.as_ref() {
t.record_count("vrps", report.vrps.len() as u64);
t.record_count("aspas", report.aspas.len() as u64);
t.record_count(
"audit_publication_points",
report.publication_points.len() as u64,
);
let timing_json_path = out_dir.join("timing.json");
t.write_json(&timing_json_path, 20)?;
eprintln!("analysis: wrote {}", timing_json_path.display());
}
#[cfg(feature = "profile")]
if let (Some((out_dir, _)), Some(report)) = (timing.as_ref(), profiler_report) {
let svg_path = out_dir.join("flamegraph.svg");
let svg_file = std::fs::File::create(&svg_path)
.map_err(|e| format!("create flamegraph failed: {}: {e}", svg_path.display()))?;
report
.flamegraph(svg_file)
.map_err(|e| format!("write flamegraph failed: {e}"))?;
eprintln!("analysis: wrote {}", svg_path.display());
let pb_path = out_dir.join("pprof.pb.gz");
let pprof_profile = report
.pprof()
.map_err(|e| format!("pprof export failed: {e}"))?;
use pprof::protos::Message;
let mut body = Vec::with_capacity(pprof_profile.encoded_len());
pprof_profile
.encode(&mut body)
.map_err(|e| format!("pprof encode failed: {e}"))?;
let gz = flate2::write::GzEncoder::new(
std::fs::File::create(&pb_path)
.map_err(|e| format!("create pprof.pb.gz failed: {}: {e}", pb_path.display()))?,
flate2::Compression::default(),
);
let mut gz = gz;
use std::io::Write;
gz.write_all(&body)
.map_err(|e| format!("write pprof.pb.gz failed: {e}"))?;
gz.finish()
.map_err(|e| format!("finish pprof.pb.gz failed: {e}"))?;
eprintln!("analysis: wrote {}", pb_path.display());
}
print_summary(&report);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_help_returns_usage() {
let argv = vec!["rpki".to_string(), "--help".to_string()];
let err = parse_args(&argv).unwrap_err();
assert!(err.contains("Usage:"), "{err}");
assert!(err.contains("--db"), "{err}");
assert!(err.contains("--rsync-mirror-root"), "{err}");
assert!(err.contains("--parallel-phase2-object-workers"), "{err}");
assert!(!err.contains("--parallel-phase1"), "{err}");
assert!(!err.contains("--parallel-phase2 "), "{err}");
}
#[test]
fn parse_rejects_unknown_argument() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/x.tal".to_string(),
"--nope".to_string(),
];
let err = parse_args(&argv).unwrap_err();
assert!(err.contains("unknown argument"), "{err}");
}
#[test]
fn parse_rejects_both_tal_url_and_tal_path() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/x.tal".to_string(),
"--tal-path".to_string(),
"x.tal".to_string(),
];
let err = parse_args(&argv).unwrap_err();
assert!(
err.contains("one-or-more --tal-url or one-or-more --tal-path/--ta-path pairs"),
"{err}"
);
}
#[test]
fn parse_rejects_invalid_max_depth() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/x.tal".to_string(),
"--max-depth".to_string(),
"nope".to_string(),
];
let err = parse_args(&argv).unwrap_err();
assert!(err.contains("invalid --max-depth"), "{err}");
}
#[test]
fn parse_accepts_ccr_out_path() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"x.tal".to_string(),
"--ta-path".to_string(),
"x.cer".to_string(),
"--rsync-local-dir".to_string(),
"repo".to_string(),
"--ccr-out".to_string(),
"out/example.ccr".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert_eq!(
args.ccr_out_path.as_deref(),
Some(std::path::Path::new("out/example.ccr"))
);
}
#[test]
fn parse_accepts_report_json_compact_when_report_json_is_set() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/x.tal".to_string(),
"--report-json".to_string(),
"out/report.json".to_string(),
"--report-json-compact".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert_eq!(
args.report_json_path.as_deref(),
Some(std::path::Path::new("out/report.json"))
);
assert!(args.report_json_compact);
}
#[test]
fn parse_rejects_report_json_compact_without_report_json() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/x.tal".to_string(),
"--report-json-compact".to_string(),
];
let err = parse_args(&argv).expect_err("compact flag without report path should fail");
assert!(
err.contains("--report-json-compact requires --report-json"),
"{err}"
);
}
#[test]
fn parse_accepts_external_raw_store_db() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--raw-store-db".to_string(),
"raw-store.db".to_string(),
"--tal-url".to_string(),
"https://example.test/x.tal".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert_eq!(
args.raw_store_db.as_deref(),
Some(std::path::Path::new("raw-store.db"))
);
}
#[test]
fn parse_accepts_external_repo_bytes_db() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--repo-bytes-db".to_string(),
"repo-bytes.db".to_string(),
"--tal-url".to_string(),
"https://example.test/x.tal".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert_eq!(
args.repo_bytes_db.as_deref(),
Some(std::path::Path::new("repo-bytes.db"))
);
}
#[test]
fn parse_accepts_cir_enable_with_raw_store_backend() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--raw-store-db".to_string(),
"raw-store.db".to_string(),
"--tal-path".to_string(),
"x.tal".to_string(),
"--ta-path".to_string(),
"x.cer".to_string(),
"--rsync-local-dir".to_string(),
"repo".to_string(),
"--cir-enable".to_string(),
"--cir-out".to_string(),
"out/example.cir".to_string(),
"--cir-tal-uri".to_string(),
"https://example.test/root.tal".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert!(args.cir_enabled);
assert_eq!(
args.raw_store_db.as_deref(),
Some(std::path::Path::new("raw-store.db"))
);
assert_eq!(args.cir_static_root, None);
}
#[test]
fn parse_accepts_cir_enable_with_required_paths_and_tal_override() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"x.tal".to_string(),
"--ta-path".to_string(),
"x.cer".to_string(),
"--rsync-local-dir".to_string(),
"repo".to_string(),
"--cir-enable".to_string(),
"--cir-out".to_string(),
"out/example.cir".to_string(),
"--cir-tal-uri".to_string(),
"https://example.test/root.tal".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert!(args.cir_enabled);
assert_eq!(
args.cir_out_path.as_deref(),
Some(std::path::Path::new("out/example.cir"))
);
assert_eq!(
args.cir_tal_uri.as_deref(),
Some("https://example.test/root.tal")
);
assert_eq!(
args.cir_tal_uris,
vec!["https://example.test/root.tal".to_string()]
);
}
#[test]
fn parse_rejects_deprecated_cir_static_root() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/root.tal".to_string(),
"--cir-enable".to_string(),
"--cir-out".to_string(),
"out/example.cir".to_string(),
"--cir-static-root".to_string(),
"out/static".to_string(),
];
let err = parse_args(&argv).expect_err("cir-static-root should be rejected");
assert!(err.contains("no longer supported"), "{err}");
}
#[test]
fn parse_accepts_default_parallel_config_and_phase2_overrides() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/root.tal".to_string(),
"--parallel-phase2-object-workers".to_string(),
"3".to_string(),
"--parallel-phase2-worker-queue-capacity".to_string(),
"17".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert_eq!(args.parallel_phase2_config.object_workers, 3);
assert_eq!(args.parallel_phase2_config.worker_queue_capacity, 17);
assert_eq!(args.parallel_phase1_config, ParallelPhase1Config::default());
}
#[test]
fn parse_rejects_removed_parallel_enable_flags() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/root.tal".to_string(),
"--parallel-phase1".to_string(),
];
let err = parse_args(&argv).expect_err("removed phase flag should fail");
assert!(err.contains("unknown argument: --parallel-phase1"), "{err}");
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/root.tal".to_string(),
"--parallel-phase2".to_string(),
];
let err = parse_args(&argv).expect_err("removed phase flag should fail");
assert!(err.contains("unknown argument: --parallel-phase2"), "{err}");
}
#[test]
fn parse_accepts_multi_tal_cir_overrides_in_file_mode() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"apnic.tal".to_string(),
"--ta-path".to_string(),
"apnic.cer".to_string(),
"--tal-path".to_string(),
"arin.tal".to_string(),
"--ta-path".to_string(),
"arin.cer".to_string(),
"--rsync-local-dir".to_string(),
"repo".to_string(),
"--cir-enable".to_string(),
"--cir-out".to_string(),
"out/example.cir".to_string(),
"--cir-tal-uri".to_string(),
"https://example.test/apnic.tal".to_string(),
"--cir-tal-uri".to_string(),
"https://example.test/arin.tal".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert_eq!(
args.cir_tal_uris,
vec![
"https://example.test/apnic.tal".to_string(),
"https://example.test/arin.tal".to_string()
]
);
}
#[test]
fn parse_rejects_incomplete_or_invalid_cir_flags() {
let argv_missing = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/root.tal".to_string(),
"--cir-enable".to_string(),
];
let err = parse_args(&argv_missing).unwrap_err();
assert!(err.contains("--cir-enable requires --cir-out"), "{err}");
let argv_needs_enable = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/root.tal".to_string(),
"--cir-out".to_string(),
"out/example.cir".to_string(),
];
let err = parse_args(&argv_needs_enable).unwrap_err();
assert!(err.contains("require --cir-enable"), "{err}");
let argv_offline_missing_uri = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"x.tal".to_string(),
"--ta-path".to_string(),
"x.cer".to_string(),
"--rsync-local-dir".to_string(),
"repo".to_string(),
"--cir-enable".to_string(),
"--cir-out".to_string(),
"out/example.cir".to_string(),
];
let err = parse_args(&argv_offline_missing_uri).unwrap_err();
assert!(err.contains("requires --cir-tal-uri"), "{err}");
}
#[test]
fn parse_rejects_invalid_validation_time() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/x.tal".to_string(),
"--validation-time".to_string(),
"not-a-time".to_string(),
];
let err = parse_args(&argv).unwrap_err();
assert!(err.contains("invalid --validation-time"), "{err}");
}
#[test]
fn parse_rejects_invalid_max_instances() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/x.tal".to_string(),
"--max-instances".to_string(),
"nope".to_string(),
];
let err = parse_args(&argv).unwrap_err();
assert!(err.contains("invalid --max-instances"), "{err}");
}
#[test]
fn parse_rejects_missing_value_for_db() {
let argv = vec!["rpki".to_string(), "--db".to_string()];
let err = parse_args(&argv).unwrap_err();
assert!(err.contains("--db requires a value"), "{err}");
}
#[test]
fn parse_rejects_missing_value_for_tal_url() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
];
let err = parse_args(&argv).unwrap_err();
assert!(err.contains("--tal-url requires a value"), "{err}");
}
#[test]
fn parse_rejects_missing_db() {
let argv = vec!["rpki".to_string(), "--tal-url".to_string(), "x".to_string()];
let err = parse_args(&argv).unwrap_err();
assert!(err.contains("--db is required"), "{err}");
}
#[test]
fn parse_rejects_missing_tal_mode() {
let argv = vec!["rpki".to_string(), "--db".to_string(), "db".to_string()];
let err = parse_args(&argv).unwrap_err();
assert!(
err.contains("--tal-url") || err.contains("--tal-path"),
"{err}"
);
}
#[test]
fn parse_accepts_tal_url_mode() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/x.tal".to_string(),
];
let args = parse_args(&argv).expect("parse");
assert_eq!(args.tal_url.as_deref(), Some("https://example.test/x.tal"));
assert_eq!(
args.tal_urls,
vec!["https://example.test/x.tal".to_string()]
);
assert!(args.tal_path.is_none());
assert!(args.ta_path.is_none());
assert_eq!(args.tal_inputs.len(), 1);
assert_eq!(args.tal_inputs[0].tal_id, "x");
assert_eq!(args.parallel_phase1_config, ParallelPhase1Config::default());
assert_eq!(args.parallel_phase2_config, ParallelPhase2Config::default());
}
#[test]
fn parse_accepts_multi_tal_without_parallel_flags() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/arin.tal".to_string(),
"--tal-url".to_string(),
"https://example.test/apnic.tal".to_string(),
"--tal-url".to_string(),
"https://example.test/ripe.tal".to_string(),
"--parallel-max-repo-sync-workers-global".to_string(),
"8".to_string(),
];
let args = parse_args(&argv).expect("parse");
assert_eq!(args.tal_urls.len(), 3);
assert_eq!(args.tal_inputs.len(), 3);
assert_eq!(args.tal_inputs[0].tal_id, "arin");
assert_eq!(args.tal_inputs[1].tal_id, "apnic");
assert_eq!(args.tal_inputs[2].tal_id, "ripe");
assert_eq!(args.parallel_phase1_config.max_repo_sync_workers_global, 8);
}
#[test]
fn parse_accepts_multi_tal_urls_by_default() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/arin.tal".to_string(),
"--tal-url".to_string(),
"https://example.test/apnic.tal".to_string(),
];
let args = parse_args(&argv).expect("parse");
assert_eq!(args.tal_urls.len(), 2);
assert_eq!(args.tal_inputs.len(), 2);
}
#[test]
fn parse_accepts_offline_mode_requires_ta() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"a.tal".to_string(),
"--ta-path".to_string(),
"ta.cer".to_string(),
"--max-depth".to_string(),
"0".to_string(),
];
let args = parse_args(&argv).expect("parse");
assert_eq!(args.tal_paths, vec![PathBuf::from("a.tal")]);
assert_eq!(args.ta_paths, vec![PathBuf::from("ta.cer")]);
assert_eq!(args.tal_path.as_deref(), Some(Path::new("a.tal")));
assert_eq!(args.ta_path.as_deref(), Some(Path::new("ta.cer")));
assert_eq!(args.max_depth, Some(0));
}
#[test]
fn parse_accepts_multiple_tal_path_pairs_by_default() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"apnic.tal".to_string(),
"--ta-path".to_string(),
"apnic-ta.cer".to_string(),
"--tal-path".to_string(),
"arin.tal".to_string(),
"--ta-path".to_string(),
"arin-ta.cer".to_string(),
];
let args = parse_args(&argv).expect("parse");
assert_eq!(args.tal_paths.len(), 2);
assert_eq!(args.ta_paths.len(), 2);
assert_eq!(args.tal_inputs.len(), 2);
}
#[test]
fn parse_rejects_mixed_tal_url_and_tal_path_modes() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/arin.tal".to_string(),
"--tal-path".to_string(),
"apnic.tal".to_string(),
"--ta-path".to_string(),
"apnic-ta.cer".to_string(),
];
let err = parse_args(&argv).unwrap_err();
assert!(err.contains("must specify either one-or-more --tal-url or one-or-more --tal-path/--ta-path pairs"), "{err}");
}
#[test]
fn parse_rejects_mismatched_tal_path_and_ta_path_counts() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"apnic.tal".to_string(),
"--tal-path".to_string(),
"arin.tal".to_string(),
"--ta-path".to_string(),
"apnic-ta.cer".to_string(),
];
let err = parse_args(&argv).unwrap_err();
assert!(
err.contains("--tal-path and --ta-path counts must match"),
"{err}"
);
}
#[test]
fn parse_accepts_tal_path_without_ta_when_disable_rrdp_is_set() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"a.tal".to_string(),
"--disable-rrdp".to_string(),
"--rsync-command".to_string(),
"/tmp/fake-rsync".to_string(),
];
let args = parse_args(&argv).expect("parse");
assert_eq!(args.tal_path.as_deref(), Some(Path::new("a.tal")));
assert!(args.ta_path.is_none());
assert!(args.disable_rrdp);
assert_eq!(
args.rsync_command.as_deref(),
Some(Path::new("/tmp/fake-rsync"))
);
}
#[test]
fn parse_accepts_multiple_tal_paths_without_ta_when_disable_rrdp() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"a.tal".to_string(),
"--tal-path".to_string(),
"b.tal".to_string(),
"--disable-rrdp".to_string(),
"--rsync-command".to_string(),
"/tmp/fake-rsync".to_string(),
];
let args = parse_args(&argv).expect("parse");
assert_eq!(
args.tal_paths,
vec![PathBuf::from("a.tal"), PathBuf::from("b.tal")]
);
assert!(args.ta_paths.is_empty());
assert_eq!(args.tal_inputs.len(), 2);
assert!(args.disable_rrdp);
}
#[test]
fn parse_accepts_payload_delta_replay_mode_with_offline_tal_and_ta() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"a.tal".to_string(),
"--ta-path".to_string(),
"ta.cer".to_string(),
"--payload-base-archive".to_string(),
"base-archive".to_string(),
"--payload-base-locks".to_string(),
"base-locks.json".to_string(),
"--payload-delta-archive".to_string(),
"delta-archive".to_string(),
"--payload-delta-locks".to_string(),
"delta-locks.json".to_string(),
];
let args = parse_args(&argv).expect("parse delta replay mode");
assert_eq!(
args.payload_base_archive.as_deref(),
Some(Path::new("base-archive"))
);
assert_eq!(
args.payload_base_locks.as_deref(),
Some(Path::new("base-locks.json"))
);
assert_eq!(
args.payload_delta_archive.as_deref(),
Some(Path::new("delta-archive"))
);
assert_eq!(
args.payload_delta_locks.as_deref(),
Some(Path::new("delta-locks.json"))
);
}
#[test]
fn parse_rejects_partial_payload_delta_arguments_and_mutual_exclusion() {
let argv_partial = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"a.tal".to_string(),
"--ta-path".to_string(),
"ta.cer".to_string(),
"--payload-base-archive".to_string(),
"base-archive".to_string(),
];
let err = parse_args(&argv_partial).unwrap_err();
assert!(err.contains("must be provided together"), "{err}");
let argv_both = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"a.tal".to_string(),
"--ta-path".to_string(),
"ta.cer".to_string(),
"--payload-replay-archive".to_string(),
"archive".to_string(),
"--payload-replay-locks".to_string(),
"locks.json".to_string(),
"--payload-base-archive".to_string(),
"base-archive".to_string(),
"--payload-base-locks".to_string(),
"base-locks.json".to_string(),
"--payload-delta-archive".to_string(),
"delta-archive".to_string(),
"--payload-delta-locks".to_string(),
"delta-locks.json".to_string(),
];
let err = parse_args(&argv_both).unwrap_err();
assert!(err.contains("mutually exclusive"), "{err}");
}
#[test]
fn parse_rejects_payload_delta_with_tal_url_or_rsync_local_dir() {
let argv_url = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/x.tal".to_string(),
"--payload-base-archive".to_string(),
"base-archive".to_string(),
"--payload-base-locks".to_string(),
"base-locks.json".to_string(),
"--payload-delta-archive".to_string(),
"delta-archive".to_string(),
"--payload-delta-locks".to_string(),
"delta-locks.json".to_string(),
];
let err = parse_args(&argv_url).unwrap_err();
assert!(err.contains("--tal-url is not supported"), "{err}");
let argv_rsync = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"a.tal".to_string(),
"--ta-path".to_string(),
"ta.cer".to_string(),
"--payload-base-archive".to_string(),
"base-archive".to_string(),
"--payload-base-locks".to_string(),
"base-locks.json".to_string(),
"--payload-delta-archive".to_string(),
"delta-archive".to_string(),
"--payload-delta-locks".to_string(),
"delta-locks.json".to_string(),
"--rsync-local-dir".to_string(),
"repo".to_string(),
];
let err = parse_args(&argv_rsync).unwrap_err();
assert!(
err.contains("payload delta replay mode cannot be combined with --rsync-local-dir"),
"{err}"
);
}
#[test]
fn parse_accepts_payload_replay_mode_with_offline_tal_and_ta() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"a.tal".to_string(),
"--ta-path".to_string(),
"ta.cer".to_string(),
"--payload-replay-archive".to_string(),
"archive".to_string(),
"--payload-replay-locks".to_string(),
"locks.json".to_string(),
];
let args = parse_args(&argv).expect("parse replay mode");
assert_eq!(
args.payload_replay_archive.as_deref(),
Some(Path::new("archive"))
);
assert_eq!(
args.payload_replay_locks.as_deref(),
Some(Path::new("locks.json"))
);
}
#[test]
fn parse_rejects_partial_payload_replay_arguments() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"a.tal".to_string(),
"--ta-path".to_string(),
"ta.cer".to_string(),
"--payload-replay-archive".to_string(),
"archive".to_string(),
];
let err = parse_args(&argv).unwrap_err();
assert!(err.contains("must be provided together"), "{err}");
}
#[test]
fn parse_rejects_payload_replay_with_tal_url_or_rsync_local_dir() {
let argv_url = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/x.tal".to_string(),
"--payload-replay-archive".to_string(),
"archive".to_string(),
"--payload-replay-locks".to_string(),
"locks.json".to_string(),
];
let err = parse_args(&argv_url).unwrap_err();
assert!(err.contains("--tal-url is not supported"), "{err}");
let argv_rsync = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"a.tal".to_string(),
"--ta-path".to_string(),
"ta.cer".to_string(),
"--payload-replay-archive".to_string(),
"archive".to_string(),
"--payload-replay-locks".to_string(),
"locks.json".to_string(),
"--rsync-local-dir".to_string(),
"repo".to_string(),
];
let err = parse_args(&argv_rsync).unwrap_err();
assert!(
err.contains("cannot be combined with --rsync-local-dir"),
"{err}"
);
}
#[test]
fn parse_accepts_validation_time_rfc3339() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/x.tal".to_string(),
"--validation-time".to_string(),
"2026-01-01T00:00:00Z".to_string(),
];
let args = parse_args(&argv).expect("parse");
assert!(args.validation_time.is_some());
}
#[test]
fn parse_rejects_removed_revalidate_only_flag() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/x.tal".to_string(),
"--revalidate-only".to_string(),
];
let err = parse_args(&argv).unwrap_err();
assert!(err.contains("unknown argument: --revalidate-only"), "{err}");
}
#[test]
fn read_policy_accepts_valid_toml() {
let dir = tempfile::tempdir().expect("tmpdir");
let p = dir.path().join("policy.toml");
std::fs::write(
&p,
"signed_object_failure_policy = \"drop_publication_point\"\n",
)
.expect("write policy");
let policy = read_policy(Some(&p)).expect("parse policy");
assert_eq!(
policy.signed_object_failure_policy,
crate::policy::SignedObjectFailurePolicy::DropPublicationPoint
);
}
#[test]
fn read_policy_reports_missing_file() {
let dir = tempfile::tempdir().expect("tmpdir");
let p = dir.path().join("missing.toml");
let err = read_policy(Some(&p)).unwrap_err();
assert!(err.contains("read policy file failed"), "{err}");
}
fn synthetic_post_validation_shared() -> PostValidationShared {
let tal_bytes = std::fs::read(
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/tal/apnic-rfc7730-https.tal"),
)
.expect("read tal fixture");
let ta_der = std::fs::read(
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/ta/apnic-ta.cer"),
)
.expect("read ta fixture");
let discovery = crate::validation::from_tal::discover_root_ca_instance_from_tal_and_ta_der(
&tal_bytes, &ta_der, None,
)
.expect("discover root");
let tree = crate::validation::tree::TreeRunOutput {
instances_processed: 1,
instances_failed: 0,
warnings: vec![
crate::report::Warning::new("synthetic warning")
.with_rfc_refs(&[crate::report::RfcRef("RFC 6487 §4.8.8.1")])
.with_context("rsync://example.test/repo/pp/"),
],
vrps: vec![crate::validation::objects::Vrp {
asn: 64496,
prefix: crate::data_model::roa::IpPrefix {
afi: crate::data_model::roa::RoaAfi::Ipv4,
prefix_len: 24,
addr: [192, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
},
max_length: 24,
}],
aspas: vec![crate::validation::objects::AspaAttestation {
customer_as_id: 64496,
provider_as_ids: vec![64497, 64498],
}],
router_keys: Vec::new(),
};
let mut pp1 = crate::audit::PublicationPointAudit::default();
pp1.source = "fresh".to_string();
pp1.rrdp_notification_uri = Some("https://example.test/n1.xml".to_string());
let mut pp2 = crate::audit::PublicationPointAudit::default();
pp2.source = "fresh".to_string();
pp2.rrdp_notification_uri = Some("https://example.test/n1.xml".to_string());
let mut pp3 = crate::audit::PublicationPointAudit::default();
pp3.source = "fresh".to_string();
pp3.rrdp_notification_uri = Some("https://example.test/n2.xml".to_string());
let out = crate::validation::run_tree_from_tal::RunTreeFromTalAuditOutput {
discovery: discovery.clone(),
discoveries: vec![discovery],
tree,
publication_points: vec![pp1, pp2, pp3],
downloads: Vec::new(),
download_stats: crate::audit::AuditDownloadStats::default(),
current_repo_objects: Vec::new(),
ccr_accumulator: None,
};
PostValidationShared::from_run_output(out)
}
fn sample_cli_ccr_accumulator() -> CcrAccumulator {
let tal_bytes = std::fs::read(
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/tal/apnic-rfc7730-https.tal"),
)
.expect("read tal fixture");
let ta_der = std::fs::read(
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/ta/apnic-ta.cer"),
)
.expect("read ta fixture");
let discovery = crate::validation::from_tal::discover_root_ca_instance_from_tal_and_ta_der(
&tal_bytes, &ta_der, None,
)
.expect("discover root");
let mut accumulator = CcrAccumulator::new(vec![discovery.trust_anchor.clone()]);
let projection = crate::storage::VcirCcrManifestProjection {
manifest_rsync_uri: "rsync://example.test/repo/current.mft".to_string(),
manifest_sha256: vec![0x44; 32],
manifest_size: 2048,
manifest_ee_aki: vec![0x55; 20],
manifest_number_be: vec![1],
manifest_this_update: crate::storage::PackTime::from_utc_offset_datetime(
time::OffsetDateTime::now_utc(),
),
manifest_sia_locations_der: vec![vec![
0x30, 0x11, 0x06, 0x08, 0x2b, 0x06, 0x01, 0x05, 0x05, 0x07, 0x30, 0x05, 0x86, 0x05,
b'r', b's', b'y', b'n', b'c',
]],
subordinate_skis: vec![vec![0x33; 20]],
};
accumulator
.append_manifest_projection(&projection)
.expect("append manifest projection");
accumulator
}
#[test]
fn build_report_and_helpers_work_on_synthetic_output() {
let shared = synthetic_post_validation_shared();
let policy = Policy::default();
let validation_time = time::OffsetDateTime::now_utc();
let report = build_report(&policy, validation_time, &shared);
assert_eq!(unique_rrdp_repos(&report), 2);
assert_eq!(report.vrps.len(), 1);
assert_eq!(report.aspas.len(), 1);
print_summary(&report);
}
#[test]
fn run_report_task_and_stage_timing_work() {
let shared = synthetic_post_validation_shared();
let policy = Policy::default();
let validation_time = time::OffsetDateTime::now_utc();
let dir = tempfile::tempdir().expect("tmpdir");
let report_path = dir.path().join("report.json");
let report_output = run_report_task(
&policy,
validation_time,
&shared,
Some(&report_path),
ReportJsonFormat::Compact,
)
.expect("run report task");
assert_eq!(report_output.report.vrps.len(), 1);
assert_eq!(report_output.report.aspas.len(), 1);
assert!(report_output.report_write_ms.is_some());
let report_json = std::fs::read_to_string(&report_path).expect("read report json");
assert!(!report_json.contains('\n'), "{report_json}");
let stage_timing = RunStageTiming {
validation_ms: 1,
report_build_ms: report_output.report_build_ms,
report_write_ms: report_output.report_write_ms,
ccr_build_ms: Some(2),
ccr_build_breakdown: None,
ccr_write_ms: Some(3),
cir_build_cir_ms: Some(4),
cir_write_cir_ms: Some(5),
cir_total_ms: Some(6),
total_ms: 7,
publication_points: shared.publication_points.len(),
repo_sync_ms_total: 8,
publication_point_repo_sync_ms_total: 9,
download_event_count: 10,
rrdp_download_ms_total: 11,
rsync_download_ms_total: 12,
download_bytes_total: 13,
};
write_stage_timing(Some(&report_path), &stage_timing).expect("write stage timing");
let stage_timing_json =
std::fs::read_to_string(dir.path().join("stage-timing.json")).expect("read timing");
assert!(stage_timing_json.contains("\"validation_ms\""));
assert!(stage_timing_json.contains("\"ccr_build_ms\""));
}
#[test]
fn run_ccr_task_uses_accumulator_when_phase2_output_contains_reuse_sources() {
let mut shared = synthetic_post_validation_shared();
shared.ccr_accumulator = Some(sample_cli_ccr_accumulator());
let mut publication_points = shared
.publication_points
.iter()
.cloned()
.collect::<Vec<_>>();
publication_points[1].source = "vcir_current_instance".to_string();
publication_points[2].source = "failed_no_cache".to_string();
shared.publication_points = publication_points.into();
let dir = tempfile::tempdir().expect("tmpdir");
let ccr_path = dir.path().join("result.ccr");
let store = RocksStore::open(&dir.path().join("db")).expect("open empty store");
let output = run_ccr_task(
&store,
&shared,
Some(&ccr_path),
time::OffsetDateTime::now_utc(),
)
.expect("run ccr task");
assert!(output.ccr_build_ms.is_some());
assert!(output.ccr_build_breakdown.is_none());
let der = std::fs::read(&ccr_path).expect("read ccr");
let ci = crate::ccr::decode_content_info(&der).expect("decode ccr");
assert_eq!(
ci.content
.mfts
.as_ref()
.map(|manifest_state| manifest_state.mis.len()),
Some(1)
);
}
#[test]
fn write_json_writes_report() {
let report = AuditReportV2 {
format_version: 2,
meta: AuditRunMeta {
validation_time_rfc3339_utc: "2026-01-01T00:00:00Z".to_string(),
},
policy: Policy::default(),
tree: TreeSummary {
instances_processed: 0,
instances_failed: 0,
warnings: Vec::new(),
},
publication_points: Vec::new(),
vrps: Vec::new(),
aspas: Vec::new(),
downloads: Vec::new(),
download_stats: crate::audit::AuditDownloadStats::default(),
repo_sync_stats: crate::audit::AuditRepoSyncStats::default(),
};
let dir = tempfile::tempdir().expect("tmpdir");
let pretty_path = dir.path().join("report-pretty.json");
write_json(&pretty_path, &report, ReportJsonFormat::Pretty).expect("write pretty json");
let pretty = std::fs::read_to_string(&pretty_path).expect("read pretty report");
assert!(pretty.contains("\"format_version\""));
assert!(pretty.contains("\"policy\""));
assert!(pretty.contains("\n \"format_version\""), "{pretty}");
let compact_path = dir.path().join("report-compact.json");
write_json(&compact_path, &report, ReportJsonFormat::Compact).expect("write compact json");
let compact = std::fs::read_to_string(&compact_path).expect("read compact report");
assert!(compact.contains("\"format_version\""));
assert!(compact.contains("\"policy\""));
assert!(!compact.contains('\n'), "{compact}");
}
#[test]
fn build_repo_sync_stats_aggregates_phase_and_terminal_state() {
let mut pp1 = crate::audit::PublicationPointAudit::default();
pp1.repo_sync_phase = Some("rrdp_ok".to_string());
pp1.repo_sync_duration_ms = Some(10);
pp1.repo_terminal_state = "fresh".to_string();
let mut pp2 = crate::audit::PublicationPointAudit::default();
pp2.repo_sync_phase = Some("rrdp_failed_rsync_failed".to_string());
pp2.repo_sync_duration_ms = Some(20);
pp2.repo_terminal_state = "failed_no_cache".to_string();
let mut pp3 = crate::audit::PublicationPointAudit::default();
pp3.repo_sync_phase = Some("rrdp_failed_rsync_failed".to_string());
pp3.repo_sync_duration_ms = Some(30);
pp3.repo_terminal_state = "failed_no_cache".to_string();
let stats = build_repo_sync_stats(&[pp1, pp2, pp3]);
assert_eq!(stats.publication_points_total, 3);
assert_eq!(stats.by_phase["rrdp_ok"].count, 1);
assert_eq!(stats.by_phase["rrdp_ok"].duration_ms_total, 10);
assert_eq!(stats.by_phase["rrdp_failed_rsync_failed"].count, 2);
assert_eq!(
stats.by_phase["rrdp_failed_rsync_failed"].duration_ms_total,
50
);
assert_eq!(stats.by_terminal_state["fresh"].count, 1);
assert_eq!(stats.by_terminal_state["failed_no_cache"].count, 2);
assert_eq!(
stats.by_terminal_state["failed_no_cache"].duration_ms_total,
50
);
}
}