rpki/src/cli.rs

2530 lines
92 KiB
Rust

mod output;
use crate::ccr::{
CcrAccumulator, CcrBuildBreakdown, build_ccr_from_run_with_breakdown, write_ccr_file,
};
use crate::cir::{CirTrustAnchorBinding, export_cir_from_run_multi};
use std::path::{Path, PathBuf};
use crate::analysis::timing::{TimingHandle, TimingMeta, TimingMetaUpdate};
use crate::audit::AuditRepoSyncStats;
#[cfg(test)]
use crate::audit::{
AspaOutput, AuditReportV2, AuditRunMeta, AuditWarning, TreeSummary, VrpOutput,
format_roa_ip_prefix,
};
use crate::fetch::http::{BlockingHttpFetcher, HttpFetcherConfig};
use crate::fetch::rsync::LocalDirRsyncFetcher;
use crate::fetch::rsync_system::{RsyncScopePolicy, SystemRsyncConfig, SystemRsyncFetcher};
use crate::memory_telemetry::{
MallocTrimProbe, MemoryTelemetryCheckpoint, MemoryTelemetrySummary, ObjectGraphMemoryMetric,
ObjectGraphMemorySection, ObjectGraphMemorySummary,
};
use crate::parallel::config::{ParallelPhase1Config, ParallelPhase2Config};
use crate::parallel::types::TalInputSpec;
use crate::policy::{Policy, StrictPolicy};
use crate::storage::{RocksStore, VcirStorageSummary};
use crate::validation::run_tree_from_tal::{
RunTreeFromTalAuditOutput, run_tree_from_multiple_tals_parallel_phase2_audit,
run_tree_from_multiple_tals_parallel_phase2_audit_with_timing,
run_tree_from_tal_and_ta_der_parallel_phase2_audit,
run_tree_from_tal_and_ta_der_parallel_phase2_audit_with_timing,
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit,
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit_with_timing,
run_tree_from_tal_and_ta_der_payload_replay_serial_audit,
run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing,
run_tree_from_tal_url_parallel_phase2_audit,
run_tree_from_tal_url_parallel_phase2_audit_with_timing,
};
use crate::validation::tree::TreeRunConfig;
#[cfg(test)]
use output::write_json;
use output::{
ReportJsonFormat, run_compare_view_task, write_report_json_from_shared, write_stage_timing,
};
use serde::Serialize;
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
struct RunStageTiming {
validation_ms: u64,
enable_roa_validation_cache: bool,
enable_transport_request_prefetch: bool,
report_build_ms: u64,
report_write_ms: Option<u64>,
ccr_build_ms: Option<u64>,
ccr_build_breakdown: Option<CcrBuildBreakdown>,
ccr_write_ms: Option<u64>,
compare_view_build_ms: Option<u64>,
compare_view_write_ms: Option<u64>,
cir_build_cir_ms: Option<u64>,
cir_write_cir_ms: Option<u64>,
cir_total_ms: Option<u64>,
total_ms: u64,
publication_points: usize,
repo_sync_ms_total: u64,
publication_point_repo_sync_ms_total: u64,
download_event_count: u64,
rrdp_download_ms_total: u64,
rsync_download_ms_total: u64,
download_bytes_total: u64,
roa_validation_cache: crate::validation::objects::RoaValidationCacheStats,
analysis_counts: HashMap<String, u64>,
vcir_storage_summary_ms: Option<u64>,
vcir_storage: Option<VcirStorageSummary>,
memory_telemetry: Option<MemoryTelemetrySummary>,
}
fn record_memory_checkpoint(
checkpoints: &mut Vec<MemoryTelemetryCheckpoint>,
label: &str,
total_started: &std::time::Instant,
store: &RocksStore,
) {
checkpoints.push(MemoryTelemetryCheckpoint {
label: label.to_string(),
elapsed_ms: total_started.elapsed().as_millis() as u64,
process: crate::memory_telemetry::process_memory_snapshot(label),
rocksdb: store.memory_snapshot(),
});
}
fn memory_trim_probe_enabled() -> bool {
std::env::var("RPKI_MEMORY_TRIM_PROBE")
.map(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES"))
.unwrap_or(false)
}
fn vcir_storage_summary_enabled() -> bool {
std::env::var("RPKI_VCIR_STORAGE_SUMMARY")
.map(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES"))
.unwrap_or(false)
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CliArgs {
pub tal_urls: Vec<String>,
pub tal_paths: Vec<PathBuf>,
pub ta_paths: Vec<PathBuf>,
pub tal_url: Option<String>,
pub tal_path: Option<PathBuf>,
pub ta_path: Option<PathBuf>,
pub parallel_phase1_config: ParallelPhase1Config,
pub parallel_phase2_config: ParallelPhase2Config,
pub tal_inputs: Vec<TalInputSpec>,
pub db_path: PathBuf,
pub raw_store_db: Option<PathBuf>,
pub repo_bytes_db: Option<PathBuf>,
pub policy_path: Option<PathBuf>,
pub strict_policy: Option<StrictPolicy>,
pub report_json_path: Option<PathBuf>,
pub report_json_compact: bool,
pub skip_report_build: bool,
pub skip_vcir_persist: bool,
pub enable_roa_validation_cache: bool,
pub enable_transport_request_prefetch: bool,
pub ccr_out_path: Option<PathBuf>,
pub vrps_csv_out_path: Option<PathBuf>,
pub vaps_csv_out_path: Option<PathBuf>,
pub compare_view_trust_anchor: Option<String>,
pub cir_enabled: bool,
pub cir_out_path: Option<PathBuf>,
pub cir_static_root: Option<PathBuf>,
pub cir_tal_uris: Vec<String>,
pub cir_tal_uri: Option<String>,
pub payload_replay_archive: Option<PathBuf>,
pub payload_replay_locks: Option<PathBuf>,
pub payload_base_archive: Option<PathBuf>,
pub payload_base_locks: Option<PathBuf>,
pub payload_base_validation_time: Option<time::OffsetDateTime>,
pub payload_delta_archive: Option<PathBuf>,
pub payload_delta_locks: Option<PathBuf>,
pub memory_trim_after_validation: bool,
pub rsync_local_dir: Option<PathBuf>,
pub disable_rrdp: bool,
pub rsync_command: Option<PathBuf>,
pub http_timeout_secs: u64,
pub rsync_timeout_secs: u64,
pub rsync_mirror_root: Option<PathBuf>,
pub rsync_scope_policy: RsyncScopePolicy,
pub max_depth: Option<usize>,
pub max_instances: Option<usize>,
pub validation_time: Option<time::OffsetDateTime>,
pub analyze: bool,
pub analysis_out_path: Option<PathBuf>,
pub profile_cpu: bool,
}
fn usage() -> String {
let bin = "rpki";
format!(
"\
Usage:
{bin} --db <path> --tal-url <url> [--tal-url <url> ...] [options]
{bin} --db <path> --tal-path <path> --ta-path <path> [--tal-path <path> --ta-path <path> ...] [options]
Options:
--db <path> RocksDB directory path (required)
--raw-store-db <path> External raw-by-hash store DB path (optional)
--repo-bytes-db <path> External repo object bytes DB path (optional)
--policy <path> Policy TOML path (optional)
--strict [policies] Enable strict policies (default all; comma list: name,cms-der,signed-attrs; none disables)
--report-json <path> Write full audit report as JSON (optional)
--report-json-compact Write report JSON without pretty-printing (requires --report-json)
--skip-report-build Skip full audit report construction when --report-json is not requested
--skip-vcir-persist Skip VCIR persistence/projection building for compare-only runs
--enable-roa-validation-cache
Reuse accepted ROA validation outputs from previous VCIR records (default: off)
--enable-transport-request-prefetch
Experimental: prefetch previous run transport repo requests before tree traversal
--ccr-out <path> Write CCR DER ContentInfo to this path (optional)
--vrps-csv-out <path> Write VRP compare-view CSV directly from validation output (optional; requires --vaps-csv-out)
--vaps-csv-out <path> Write VAP compare-view CSV directly from validation output (optional; requires --vrps-csv-out)
--compare-view-trust-anchor <name>
Trust-anchor label used by direct compare-view CSV output (default: unknown)
--cir-enable Export CIR after the run completes
--cir-out <path> Write CIR DER to this path (requires --cir-enable)
--cir-static-root <path> Deprecated; CIR export no longer exports object pools
--cir-tal-uri <url> Override TAL URI for CIR export (repeatable in multi-TAL mode)
--payload-replay-archive <path> Use local payload replay archive root (offline replay mode)
--payload-replay-locks <path> Use local payload replay locks.json (offline replay mode)
--payload-base-archive <path> Use local base payload archive root (offline delta replay)
--payload-base-locks <path> Use local base locks.json (offline delta replay)
--payload-base-validation-time <rfc3339> Validation time for the base bootstrap inside offline delta replay
--payload-delta-archive <path> Use local delta payload archive root (offline delta replay)
--payload-delta-locks <path> Use local locks-delta.json (offline delta replay)
--memory-trim-after-validation Call malloc_trim(0) after validation/report memory checkpoints (Linux glibc only; default off)
--tal-url <url> TAL URL (repeatable; URL mode)
--tal-path <path> TAL file path (repeatable; file mode)
--ta-path <path> TA certificate DER file path (repeatable in file mode; pairs with --tal-path by position)
--parallel-max-repo-sync-workers-global <n>
Phase 1 global repo sync worker budget (default: 4)
--parallel-max-inflight-snapshot-bytes-global <n>
Phase 1 inflight snapshot byte budget (default: 512MiB)
--parallel-max-pending-repo-results <n>
Phase 1 pending repo result budget (default: 1024)
--parallel-phase2-object-workers <n>
Phase 2 object worker count (default: 8)
--parallel-phase2-worker-queue-capacity <n>
Phase 2 per-worker object queue capacity (default: 256)
--parallel-phase2-ready-batch-size <n>
Phase 2 ready publication points processed per scheduler turn (default: 256)
--parallel-phase2-ready-batch-wall-time-budget-ms <n>
Phase 2 ready staging wall-time budget per scheduler turn (default: 100)
--parallel-phase2-result-drain-batch-size <n>
Phase 2 object results drained per scheduler turn (default: 2048)
--parallel-phase2-finalize-batch-size <n>
Legacy Phase 2 scheduler finalize budget; dedicated finalize worker ignores it (default: 256)
--parallel-phase2-finalize-batch-wall-time-budget-ms <n>
Legacy Phase 2 scheduler finalize time budget; dedicated finalize worker ignores it (default: 100)
--parallel-phase2-finalize-queue-capacity <n>
Phase 2 dedicated finalize worker queue capacity (default: 32768)
--rsync-local-dir <path> Use LocalDirRsyncFetcher rooted at this directory (offline tests)
--disable-rrdp Disable RRDP and synchronize only via rsync
--rsync-command <path> Use this rsync command instead of the default rsync binary
--http-timeout-secs <n> HTTP fetch timeout seconds (default: 20)
--rsync-timeout-secs <n> rsync I/O timeout seconds (default: 60)
--rsync-mirror-root <path> Persist rsync mirrors under this directory (default: disabled)
--rsync-scope <policy> rsync scope policy: host, publication-point, or module-root (default: module-root)
--max-depth <n> Max CA instance depth (0 = root only)
--max-instances <n> Max number of CA instances to process
--validation-time <rfc3339> Validation time in RFC3339 (default: now UTC)
--analyze Write timing analysis JSON under target/live/analyze/<timestamp>/
--analysis-out <path> Write timing analysis JSON under this directory (implies --analyze)
--profile-cpu (Requires build feature 'profile') Write CPU flamegraph under analyze dir
--help Show this help
"
)
}
pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
let mut tal_urls: Vec<String> = Vec::new();
let mut tal_paths: Vec<PathBuf> = Vec::new();
let mut ta_paths: Vec<PathBuf> = Vec::new();
let mut parallel_phase1_cfg = ParallelPhase1Config::default();
let mut parallel_phase2_cfg = ParallelPhase2Config::default();
let mut db_path: Option<PathBuf> = None;
let mut raw_store_db: Option<PathBuf> = None;
let mut repo_bytes_db: Option<PathBuf> = None;
let mut policy_path: Option<PathBuf> = None;
let mut strict_policy: Option<StrictPolicy> = None;
let mut report_json_path: Option<PathBuf> = None;
let mut report_json_compact: bool = false;
let mut skip_report_build: bool = false;
let mut skip_vcir_persist: bool = false;
let mut enable_roa_validation_cache: bool = false;
let mut enable_transport_request_prefetch: bool = false;
let mut ccr_out_path: Option<PathBuf> = None;
let mut vrps_csv_out_path: Option<PathBuf> = None;
let mut vaps_csv_out_path: Option<PathBuf> = None;
let mut compare_view_trust_anchor: Option<String> = None;
let mut cir_enabled: bool = false;
let mut cir_out_path: Option<PathBuf> = None;
let mut cir_static_root: Option<PathBuf> = None;
let mut cir_tal_uris: Vec<String> = Vec::new();
let mut cir_tal_uri: Option<String> = None;
let mut payload_replay_archive: Option<PathBuf> = None;
let mut payload_replay_locks: Option<PathBuf> = None;
let mut payload_base_archive: Option<PathBuf> = None;
let mut payload_base_locks: Option<PathBuf> = None;
let mut payload_base_validation_time: Option<time::OffsetDateTime> = None;
let mut payload_delta_archive: Option<PathBuf> = None;
let mut payload_delta_locks: Option<PathBuf> = None;
let mut memory_trim_after_validation = false;
let mut rsync_local_dir: Option<PathBuf> = None;
let mut disable_rrdp: bool = false;
let mut rsync_command: Option<PathBuf> = None;
let mut http_timeout_secs: u64 = 30;
let mut rsync_timeout_secs: u64 = 30;
let mut rsync_mirror_root: Option<PathBuf> = None;
let mut rsync_scope_policy = RsyncScopePolicy::default();
let mut max_depth: Option<usize> = None;
let mut max_instances: Option<usize> = None;
let mut validation_time: Option<time::OffsetDateTime> = None;
let mut analyze: bool = false;
let mut analysis_out_path: Option<PathBuf> = None;
let mut profile_cpu: bool = false;
let mut i = 1usize;
while i < argv.len() {
let arg = argv[i].as_str();
match arg {
"--help" | "-h" => return Err(usage()),
"--tal-url" => {
i += 1;
let v = argv.get(i).ok_or("--tal-url requires a value")?;
tal_urls.push(v.clone());
}
"--tal-path" => {
i += 1;
let v = argv.get(i).ok_or("--tal-path requires a value")?;
tal_paths.push(PathBuf::from(v));
}
"--ta-path" => {
i += 1;
let v = argv.get(i).ok_or("--ta-path requires a value")?;
ta_paths.push(PathBuf::from(v));
}
"--parallel-max-repo-sync-workers-global" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-max-repo-sync-workers-global requires a value")?;
parallel_phase1_cfg.max_repo_sync_workers_global = v
.parse::<usize>()
.map_err(|_| format!("invalid --parallel-max-repo-sync-workers-global: {v}"))?;
}
"--parallel-max-inflight-snapshot-bytes-global" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-max-inflight-snapshot-bytes-global requires a value")?;
parallel_phase1_cfg.max_inflight_snapshot_bytes_global =
v.parse::<usize>().map_err(|_| {
format!("invalid --parallel-max-inflight-snapshot-bytes-global: {v}")
})?;
}
"--parallel-max-pending-repo-results" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-max-pending-repo-results requires a value")?;
parallel_phase1_cfg.max_pending_repo_results = v
.parse::<usize>()
.map_err(|_| format!("invalid --parallel-max-pending-repo-results: {v}"))?;
}
"--parallel-phase2-object-workers" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-phase2-object-workers requires a value")?;
parallel_phase2_cfg.object_workers = v
.parse::<usize>()
.map_err(|_| format!("invalid --parallel-phase2-object-workers: {v}"))?;
}
"--parallel-phase2-worker-queue-capacity" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-phase2-worker-queue-capacity requires a value")?;
parallel_phase2_cfg.worker_queue_capacity = v
.parse::<usize>()
.map_err(|_| format!("invalid --parallel-phase2-worker-queue-capacity: {v}"))?;
}
"--parallel-phase2-ready-batch-size" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-phase2-ready-batch-size requires a value")?;
parallel_phase2_cfg.ready_batch_size = v
.parse::<usize>()
.map_err(|_| format!("invalid --parallel-phase2-ready-batch-size: {v}"))?;
}
"--parallel-phase2-ready-batch-wall-time-budget-ms" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-phase2-ready-batch-wall-time-budget-ms requires a value")?;
parallel_phase2_cfg.ready_batch_wall_time_budget_ms =
v.parse::<u64>().map_err(|_| {
format!("invalid --parallel-phase2-ready-batch-wall-time-budget-ms: {v}")
})?;
}
"--parallel-phase2-result-drain-batch-size" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-phase2-result-drain-batch-size requires a value")?;
parallel_phase2_cfg.object_result_drain_batch_size =
v.parse::<usize>().map_err(|_| {
format!("invalid --parallel-phase2-result-drain-batch-size: {v}")
})?;
}
"--parallel-phase2-finalize-batch-size" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-phase2-finalize-batch-size requires a value")?;
parallel_phase2_cfg.publication_point_finalize_batch_size = v
.parse::<usize>()
.map_err(|_| format!("invalid --parallel-phase2-finalize-batch-size: {v}"))?;
}
"--parallel-phase2-finalize-batch-wall-time-budget-ms" => {
i += 1;
let v = argv.get(i).ok_or(
"--parallel-phase2-finalize-batch-wall-time-budget-ms requires a value",
)?;
parallel_phase2_cfg.publication_point_finalize_wall_time_budget_ms =
v.parse::<u64>().map_err(|_| {
format!("invalid --parallel-phase2-finalize-batch-wall-time-budget-ms: {v}")
})?;
}
"--parallel-phase2-finalize-queue-capacity" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-phase2-finalize-queue-capacity requires a value")?;
parallel_phase2_cfg.publication_point_finalize_queue_capacity =
v.parse::<usize>().map_err(|_| {
format!("invalid --parallel-phase2-finalize-queue-capacity: {v}")
})?;
}
"--db" => {
i += 1;
let v = argv.get(i).ok_or("--db requires a value")?;
db_path = Some(PathBuf::from(v));
}
"--raw-store-db" => {
i += 1;
let v = argv.get(i).ok_or("--raw-store-db requires a value")?;
raw_store_db = Some(PathBuf::from(v));
}
"--repo-bytes-db" => {
i += 1;
let v = argv.get(i).ok_or("--repo-bytes-db requires a value")?;
repo_bytes_db = Some(PathBuf::from(v));
}
"--policy" => {
i += 1;
let v = argv.get(i).ok_or("--policy requires a value")?;
policy_path = Some(PathBuf::from(v));
}
"--strict" => {
let next = argv.get(i + 1).map(String::as_str);
let spec = next.filter(|v| !v.starts_with("--"));
if spec.is_some() {
i += 1;
}
strict_policy = Some(StrictPolicy::parse_cli_spec(spec)?);
}
_ if arg.starts_with("--strict=") => {
let spec = arg.strip_prefix("--strict=").expect("prefix checked");
strict_policy = Some(StrictPolicy::parse_cli_spec(Some(spec))?);
}
"--report-json" => {
i += 1;
let v = argv.get(i).ok_or("--report-json requires a value")?;
report_json_path = Some(PathBuf::from(v));
}
"--report-json-compact" => {
report_json_compact = true;
}
"--skip-report-build" => {
skip_report_build = true;
}
"--skip-vcir-persist" => {
skip_vcir_persist = true;
}
"--enable-roa-validation-cache" => {
enable_roa_validation_cache = true;
}
"--enable-transport-request-prefetch" => {
enable_transport_request_prefetch = true;
}
"--ccr-out" => {
i += 1;
let v = argv.get(i).ok_or("--ccr-out requires a value")?;
ccr_out_path = Some(PathBuf::from(v));
}
"--vrps-csv-out" => {
i += 1;
let v = argv.get(i).ok_or("--vrps-csv-out requires a value")?;
vrps_csv_out_path = Some(PathBuf::from(v));
}
"--vaps-csv-out" => {
i += 1;
let v = argv.get(i).ok_or("--vaps-csv-out requires a value")?;
vaps_csv_out_path = Some(PathBuf::from(v));
}
"--compare-view-trust-anchor" => {
i += 1;
let v = argv
.get(i)
.ok_or("--compare-view-trust-anchor requires a value")?;
compare_view_trust_anchor = Some(v.clone());
}
"--cir-enable" => {
cir_enabled = true;
}
"--cir-out" => {
i += 1;
let v = argv.get(i).ok_or("--cir-out requires a value")?;
cir_out_path = Some(PathBuf::from(v));
}
"--cir-static-root" => {
i += 1;
let v = argv.get(i).ok_or("--cir-static-root requires a value")?;
cir_static_root = Some(PathBuf::from(v));
}
"--cir-tal-uri" => {
i += 1;
let v = argv.get(i).ok_or("--cir-tal-uri requires a value")?;
cir_tal_uris.push(v.clone());
cir_tal_uri = cir_tal_uris.first().cloned();
}
"--payload-replay-archive" => {
i += 1;
let v = argv
.get(i)
.ok_or("--payload-replay-archive requires a value")?;
payload_replay_archive = Some(PathBuf::from(v));
}
"--payload-replay-locks" => {
i += 1;
let v = argv
.get(i)
.ok_or("--payload-replay-locks requires a value")?;
payload_replay_locks = Some(PathBuf::from(v));
}
"--payload-base-archive" => {
i += 1;
let v = argv
.get(i)
.ok_or("--payload-base-archive requires a value")?;
payload_base_archive = Some(PathBuf::from(v));
}
"--payload-base-locks" => {
i += 1;
let v = argv.get(i).ok_or("--payload-base-locks requires a value")?;
payload_base_locks = Some(PathBuf::from(v));
}
"--payload-base-validation-time" => {
i += 1;
let v = argv
.get(i)
.ok_or("--payload-base-validation-time requires a value")?;
use time::format_description::well_known::Rfc3339;
let t = time::OffsetDateTime::parse(v, &Rfc3339).map_err(|e| {
format!("invalid --payload-base-validation-time (RFC3339 expected): {e}")
})?;
payload_base_validation_time = Some(t);
}
"--payload-delta-archive" => {
i += 1;
let v = argv
.get(i)
.ok_or("--payload-delta-archive requires a value")?;
payload_delta_archive = Some(PathBuf::from(v));
}
"--payload-delta-locks" => {
i += 1;
let v = argv
.get(i)
.ok_or("--payload-delta-locks requires a value")?;
payload_delta_locks = Some(PathBuf::from(v));
}
"--memory-trim-after-validation" => {
memory_trim_after_validation = true;
}
"--rsync-local-dir" => {
i += 1;
let v = argv.get(i).ok_or("--rsync-local-dir requires a value")?;
rsync_local_dir = Some(PathBuf::from(v));
}
"--disable-rrdp" => {
disable_rrdp = true;
}
"--rsync-command" => {
i += 1;
let v = argv.get(i).ok_or("--rsync-command requires a value")?;
rsync_command = Some(PathBuf::from(v));
}
"--http-timeout-secs" => {
i += 1;
let v = argv.get(i).ok_or("--http-timeout-secs requires a value")?;
http_timeout_secs = v
.parse::<u64>()
.map_err(|_| format!("invalid --http-timeout-secs: {v}"))?;
}
"--rsync-timeout-secs" => {
i += 1;
let v = argv.get(i).ok_or("--rsync-timeout-secs requires a value")?;
rsync_timeout_secs = v
.parse::<u64>()
.map_err(|_| format!("invalid --rsync-timeout-secs: {v}"))?;
}
"--rsync-mirror-root" => {
i += 1;
let v = argv.get(i).ok_or("--rsync-mirror-root requires a value")?;
rsync_mirror_root = Some(PathBuf::from(v));
}
"--rsync-scope" => {
i += 1;
let v = argv.get(i).ok_or("--rsync-scope requires a value")?;
rsync_scope_policy = RsyncScopePolicy::parse_cli_value(v)?;
}
"--max-depth" => {
i += 1;
let v = argv.get(i).ok_or("--max-depth requires a value")?;
max_depth = Some(
v.parse::<usize>()
.map_err(|_| format!("invalid --max-depth: {v}"))?,
);
}
"--max-instances" => {
i += 1;
let v = argv.get(i).ok_or("--max-instances requires a value")?;
max_instances = Some(
v.parse::<usize>()
.map_err(|_| format!("invalid --max-instances: {v}"))?,
);
}
"--validation-time" => {
i += 1;
let v = argv.get(i).ok_or("--validation-time requires a value")?;
use time::format_description::well_known::Rfc3339;
let t = time::OffsetDateTime::parse(v, &Rfc3339)
.map_err(|e| format!("invalid --validation-time (RFC3339 expected): {e}"))?;
validation_time = Some(t);
}
"--analyze" => {
analyze = true;
}
"--analysis-out" => {
i += 1;
let v = argv.get(i).ok_or("--analysis-out requires a value")?;
analyze = true;
analysis_out_path = Some(PathBuf::from(v));
}
"--profile-cpu" => {
profile_cpu = true;
}
_ => return Err(format!("unknown argument: {arg}\n\n{}", usage())),
}
i += 1;
}
let db_path = db_path.ok_or_else(|| format!("--db is required\n\n{}", usage()))?;
let tal_mode_count = (!tal_urls.is_empty()) as u8 + (!tal_paths.is_empty()) as u8;
if tal_mode_count != 1 {
return Err(format!(
"must specify either one-or-more --tal-url or one-or-more --tal-path/--ta-path pairs\n\n{}",
usage()
));
}
if parallel_phase2_cfg.object_workers == 0 {
return Err(format!(
"--parallel-phase2-object-workers must be > 0\n\n{}",
usage()
));
}
if parallel_phase2_cfg.worker_queue_capacity == 0 {
return Err(format!(
"--parallel-phase2-worker-queue-capacity must be > 0\n\n{}",
usage()
));
}
if parallel_phase2_cfg.ready_batch_size == 0 {
return Err(format!(
"--parallel-phase2-ready-batch-size must be > 0\n\n{}",
usage()
));
}
if parallel_phase2_cfg.ready_batch_wall_time_budget_ms == 0 {
return Err(format!(
"--parallel-phase2-ready-batch-wall-time-budget-ms must be > 0\n\n{}",
usage()
));
}
if parallel_phase2_cfg.object_result_drain_batch_size == 0 {
return Err(format!(
"--parallel-phase2-result-drain-batch-size must be > 0\n\n{}",
usage()
));
}
if parallel_phase2_cfg.publication_point_finalize_batch_size == 0 {
return Err(format!(
"--parallel-phase2-finalize-batch-size must be > 0\n\n{}",
usage()
));
}
if parallel_phase2_cfg.publication_point_finalize_wall_time_budget_ms == 0 {
return Err(format!(
"--parallel-phase2-finalize-batch-wall-time-budget-ms must be > 0\n\n{}",
usage()
));
}
if parallel_phase2_cfg.publication_point_finalize_queue_capacity == 0 {
return Err(format!(
"--parallel-phase2-finalize-queue-capacity must be > 0\n\n{}",
usage()
));
}
if !tal_urls.is_empty() && !ta_paths.is_empty() {
return Err(format!(
"--ta-path cannot be used with --tal-url mode\n\n{}",
usage()
));
}
if !tal_paths.is_empty() {
if !ta_paths.is_empty() {
if ta_paths.len() != tal_paths.len() {
return Err(format!(
"--tal-path and --ta-path counts must match in file mode\n\n{}",
usage()
));
}
} else if ta_paths.is_empty() && !disable_rrdp {
return Err(format!(
"--tal-path requires --ta-path unless --disable-rrdp is set\n\n{}",
usage()
));
}
}
let tal_url = tal_urls.first().cloned();
let tal_path = tal_paths.first().cloned();
let ta_path = ta_paths.first().cloned();
if cir_enabled && cir_out_path.is_none() {
return Err(format!("--cir-enable requires --cir-out\n\n{}", usage()));
}
if report_json_compact && report_json_path.is_none() {
return Err(format!(
"--report-json-compact requires --report-json\n\n{}",
usage()
));
}
if skip_report_build && report_json_path.is_some() {
return Err(format!(
"--skip-report-build cannot be combined with --report-json\n\n{}",
usage()
));
}
if vrps_csv_out_path.is_some() != vaps_csv_out_path.is_some() {
return Err(format!(
"--vrps-csv-out and --vaps-csv-out must be provided together\n\n{}",
usage()
));
}
if compare_view_trust_anchor.is_some() && vrps_csv_out_path.is_none() {
return Err(format!(
"--compare-view-trust-anchor requires --vrps-csv-out/--vaps-csv-out\n\n{}",
usage()
));
}
if cir_static_root.is_some() {
return Err(format!(
"--cir-static-root is no longer supported; CIR export now writes only .cir files\n\n{}",
usage()
));
}
if !cir_enabled && (cir_out_path.is_some() || !cir_tal_uris.is_empty()) {
return Err(format!(
"--cir-out/--cir-tal-uri require --cir-enable\n\n{}",
usage()
));
}
if cir_enabled && !cir_tal_uris.is_empty() {
let expected = if !tal_paths.is_empty() {
tal_paths.len()
} else {
tal_urls.len()
};
if cir_tal_uris.len() != expected {
return Err(format!(
"--cir-tal-uri count must match TAL input count when provided\n\n{}",
usage()
));
}
}
if cir_enabled && !tal_paths.is_empty() && cir_tal_uris.is_empty() {
return Err(format!(
"CIR export in --tal-path mode requires --cir-tal-uri for each TAL\n\n{}",
usage()
));
}
let replay_mode_count =
payload_replay_archive.is_some() as u8 + payload_replay_locks.is_some() as u8;
if replay_mode_count == 1 {
return Err(format!(
"--payload-replay-archive and --payload-replay-locks must be provided together
{}",
usage()
));
}
let delta_mode_count = payload_base_archive.is_some() as u8
+ payload_base_locks.is_some() as u8
+ payload_delta_archive.is_some() as u8
+ payload_delta_locks.is_some() as u8;
if delta_mode_count > 0 && delta_mode_count < 4 {
return Err(format!(
"--payload-base-archive, --payload-base-locks, --payload-delta-archive and --payload-delta-locks must be provided together
{}",
usage()
));
}
if replay_mode_count == 2 && delta_mode_count == 4 {
return Err(format!(
"snapshot replay mode and delta replay mode are mutually exclusive
{}",
usage()
));
}
if replay_mode_count == 2 {
if tal_url.is_some() {
return Err(format!(
"payload replay mode requires --tal-path and --ta-path; --tal-url is not supported
{}",
usage()
));
}
if tal_path.is_none() || ta_path.is_none() {
return Err(format!(
"payload replay mode requires --tal-path and --ta-path
{}",
usage()
));
}
if rsync_local_dir.is_some() {
return Err(format!(
"payload replay mode cannot be combined with --rsync-local-dir
{}",
usage()
));
}
}
if delta_mode_count == 4 {
if tal_url.is_some() {
return Err(format!(
"payload delta replay mode requires --tal-path and --ta-path; --tal-url is not supported
{}",
usage()
));
}
if tal_path.is_none() || ta_path.is_none() {
return Err(format!(
"payload delta replay mode requires --tal-path and --ta-path
{}",
usage()
));
}
if rsync_local_dir.is_some() {
return Err(format!(
"payload delta replay mode cannot be combined with --rsync-local-dir
{}",
usage()
));
}
}
let mut tal_inputs = Vec::new();
if !tal_urls.is_empty() {
tal_inputs.extend(tal_urls.iter().cloned().map(TalInputSpec::from_url));
} else if !tal_paths.is_empty() {
if ta_paths.len() == tal_paths.len() {
tal_inputs.extend(tal_paths.iter().cloned().zip(ta_paths.iter().cloned()).map(
|(tal_path, ta_path)| TalInputSpec::from_file_path_with_ta(tal_path, ta_path),
));
} else {
tal_inputs.extend(tal_paths.iter().cloned().map(TalInputSpec::from_file_path));
}
}
Ok(CliArgs {
tal_urls,
tal_paths,
ta_paths,
tal_url,
tal_path,
ta_path,
parallel_phase1_config: parallel_phase1_cfg,
parallel_phase2_config: parallel_phase2_cfg,
tal_inputs,
db_path,
raw_store_db,
repo_bytes_db,
policy_path,
strict_policy,
report_json_path,
report_json_compact,
skip_report_build,
skip_vcir_persist,
enable_roa_validation_cache,
enable_transport_request_prefetch,
ccr_out_path,
vrps_csv_out_path,
vaps_csv_out_path,
compare_view_trust_anchor,
cir_enabled,
cir_out_path,
cir_static_root,
cir_tal_uris,
cir_tal_uri,
payload_replay_archive,
payload_replay_locks,
payload_base_archive,
payload_base_locks,
payload_base_validation_time,
payload_delta_archive,
payload_delta_locks,
memory_trim_after_validation,
rsync_local_dir,
disable_rrdp,
rsync_command,
http_timeout_secs,
rsync_timeout_secs,
rsync_mirror_root,
rsync_scope_policy,
max_depth,
max_instances,
validation_time,
analyze,
analysis_out_path,
profile_cpu,
})
}
fn read_policy(path: Option<&Path>) -> Result<Policy, String> {
match path {
None => Ok(Policy::default()),
Some(p) => {
let s = std::fs::read_to_string(p)
.map_err(|e| format!("read policy file failed: {}: {e}", p.display()))?;
Policy::from_toml_str(&s).map_err(|e| e.to_string())
}
}
}
fn unique_rrdp_repos_from_publication_points(
publication_points: &[crate::audit::PublicationPointAudit],
) -> usize {
use std::collections::HashSet;
let mut set: HashSet<&str> = HashSet::new();
for pp in publication_points {
if let Some(u) = pp.rrdp_notification_uri.as_deref() {
set.insert(u);
}
}
set.len()
}
#[cfg(test)]
fn unique_rrdp_repos(report: &AuditReportV2) -> usize {
unique_rrdp_repos_from_publication_points(&report.publication_points)
}
#[cfg(test)]
fn print_summary(report: &AuditReportV2) {
let rrdp_repos = unique_rrdp_repos(report);
println!("RPKI stage2 serial run summary");
println!(
"validation_time={}",
report.meta.validation_time_rfc3339_utc
);
println!(
"publication_points_processed={} publication_points_failed={}",
report.tree.instances_processed, report.tree.instances_failed
);
println!("rrdp_repos_unique={rrdp_repos}");
println!("vrps={}", report.vrps.len());
println!("aspas={}", report.aspas.len());
println!(
"audit_publication_points={}",
report.publication_points.len()
);
println!(
"warnings_total={}",
report.tree.warnings.len()
+ report
.publication_points
.iter()
.map(|pp| pp.warnings.len())
.sum::<usize>()
);
}
fn print_summary_from_shared(validation_time: time::OffsetDateTime, shared: &PostValidationShared) {
use time::format_description::well_known::Rfc3339;
let validation_time_rfc3339_utc = validation_time
.to_offset(time::UtcOffset::UTC)
.format(&Rfc3339)
.expect("format validation_time");
let rrdp_repos = unique_rrdp_repos_from_publication_points(shared.publication_points.as_ref());
println!("RPKI stage2 serial run summary");
println!("validation_time={validation_time_rfc3339_utc}");
println!(
"publication_points_processed={} publication_points_failed={}",
shared.instances_processed, shared.instances_failed
);
println!("rrdp_repos_unique={rrdp_repos}");
println!("vrps={}", shared.vrps.len());
println!("aspas={}", shared.aspas.len());
println!(
"audit_publication_points={}",
shared.publication_points.len()
);
println!(
"warnings_total={}",
shared.tree_warnings.len()
+ shared
.publication_points
.iter()
.map(|pp| pp.warnings.len())
.sum::<usize>()
);
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct PostValidationShared {
discovery: crate::validation::from_tal::DiscoveredRootCaInstance,
discoveries: Arc<[crate::validation::from_tal::DiscoveredRootCaInstance]>,
successful_tal_inputs: Arc<[TalInputSpec]>,
instances_processed: usize,
instances_failed: usize,
tree_warnings: Arc<[crate::report::Warning]>,
vrps: Arc<[crate::validation::objects::Vrp]>,
aspas: Arc<[crate::validation::objects::AspaAttestation]>,
router_keys: Arc<[crate::validation::objects::RouterKeyPayload]>,
publication_points: Arc<[crate::audit::PublicationPointAudit]>,
roa_cache_stats: crate::validation::objects::RoaValidationCacheStats,
downloads: Arc<[crate::audit::AuditDownloadEvent]>,
download_stats: crate::audit::AuditDownloadStats,
current_repo_objects: Arc<[crate::current_repo_index::CurrentRepoObject]>,
ccr_accumulator: Option<CcrAccumulator>,
}
impl PostValidationShared {
fn from_run_output(out: RunTreeFromTalAuditOutput) -> Self {
let RunTreeFromTalAuditOutput {
discovery,
discoveries,
successful_tal_inputs,
tree,
publication_points,
roa_cache_stats,
downloads,
download_stats,
current_repo_objects,
ccr_accumulator,
} = out;
let crate::validation::tree::TreeRunOutput {
instances_processed,
instances_failed,
warnings,
vrps,
aspas,
router_keys,
} = tree;
Self {
discovery,
discoveries: discoveries.into(),
successful_tal_inputs: successful_tal_inputs.into(),
instances_processed,
instances_failed,
tree_warnings: warnings.into(),
vrps: vrps.into(),
aspas: aspas.into(),
router_keys: router_keys.into(),
publication_points: publication_points.into(),
roa_cache_stats,
downloads: downloads.into(),
download_stats,
current_repo_objects: current_repo_objects.into(),
ccr_accumulator,
}
}
fn trust_anchors(&self) -> Vec<crate::data_model::ta::TrustAnchor> {
if self.discoveries.is_empty() {
vec![self.discovery.trust_anchor.clone()]
} else {
self.discoveries
.iter()
.map(|item| item.trust_anchor.clone())
.collect()
}
}
}
#[derive(Default)]
struct ObjectGraphSectionBuilder {
name: String,
item_count: u64,
shallow_bytes: u64,
heap_bytes: u64,
string_count: u64,
string_bytes: u64,
string_capacity_bytes: u64,
vec_count: u64,
vec_heap_bytes: u64,
vec_capacity_bytes: u64,
details: Vec<ObjectGraphMemoryMetric>,
}
impl ObjectGraphSectionBuilder {
fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
..Self::default()
}
}
fn items(&mut self, count: usize, item_size: usize) {
self.item_count += count as u64;
self.shallow_bytes += (count as u64) * (item_size as u64);
}
fn heap_bytes(&mut self, value: usize) {
self.heap_bytes += value as u64;
}
fn string(&mut self, value: &str) {
self.string_count += 1;
self.string_bytes += value.len() as u64;
self.string_capacity_bytes += value.len() as u64;
self.heap_bytes += value.len() as u64;
}
fn owned_string(&mut self, value: &String) {
self.string_count += 1;
self.string_bytes += value.len() as u64;
self.string_capacity_bytes += value.capacity() as u64;
self.heap_bytes += value.capacity() as u64;
}
fn optional_string(&mut self, value: Option<&String>) {
if let Some(value) = value {
self.owned_string(value);
}
}
fn vec_header_with_capacity(&mut self, len: usize, capacity: usize, element_size: usize) {
self.vec_count += 1;
let payload_bytes = len * element_size;
let capacity_bytes = capacity * element_size;
self.vec_heap_bytes += payload_bytes as u64;
self.vec_capacity_bytes += capacity_bytes as u64;
self.heap_bytes += capacity_bytes as u64;
}
fn byte_vec_owned(&mut self, value: &Vec<u8>) {
self.vec_header_with_capacity(value.len(), value.capacity(), std::mem::size_of::<u8>());
}
fn string_vec_owned(&mut self, values: &Vec<String>) {
self.vec_header_with_capacity(
values.len(),
values.capacity(),
std::mem::size_of::<String>(),
);
for value in values {
self.owned_string(value);
}
}
fn metric(&mut self, name: impl Into<String>, value: u64) {
self.details.push(ObjectGraphMemoryMetric {
name: name.into(),
value,
});
}
fn finish(self) -> ObjectGraphMemorySection {
let estimated_bytes = self.shallow_bytes + self.heap_bytes;
ObjectGraphMemorySection {
name: self.name,
item_count: self.item_count,
shallow_bytes: self.shallow_bytes,
heap_bytes: self.heap_bytes,
estimated_bytes,
string_count: self.string_count,
string_bytes: self.string_bytes,
string_capacity_bytes: self.string_capacity_bytes,
vec_count: self.vec_count,
vec_heap_bytes: self.vec_heap_bytes,
vec_capacity_bytes: self.vec_capacity_bytes,
details: self.details,
}
}
}
fn estimate_shared_object_graph(shared: &PostValidationShared) -> ObjectGraphMemorySummary {
let mut sections = Vec::new();
sections.push(estimate_publication_points_graph(
shared.publication_points.as_ref(),
));
sections.push(estimate_vrps_graph(shared.vrps.as_ref()));
sections.push(estimate_aspas_graph(shared.aspas.as_ref()));
sections.push(estimate_router_keys_graph(shared.router_keys.as_ref()));
sections.push(estimate_warnings_graph(
"tree_warnings",
shared.tree_warnings.as_ref(),
));
sections.push(estimate_downloads_graph(shared.downloads.as_ref()));
sections.push(estimate_current_repo_objects_graph(
shared.current_repo_objects.as_ref(),
));
sections.push(estimate_trust_anchor_graph(shared));
sections.push(estimate_ccr_accumulator_graph(
shared.ccr_accumulator.as_ref(),
));
let total_estimated_bytes = sections
.iter()
.map(|section| section.estimated_bytes)
.sum::<u64>();
ObjectGraphMemorySummary {
captured_at_label: "after_validation".to_string(),
total_estimated_bytes,
sections,
notes: vec![
"Estimated bytes are Rust object graph approximations based on struct sizes and owned String/Vec payload lengths.".to_string(),
"The estimate intentionally excludes allocator metadata, fragmentation, freed-but-retained arenas, RocksDB C++ heap, and transient worker allocations.".to_string(),
"Large RSS minus this estimate points to allocator retention or structures not yet modeled by this telemetry.".to_string(),
],
}
}
fn estimate_publication_points_graph(
publication_points: &[crate::audit::PublicationPointAudit],
) -> ObjectGraphMemorySection {
let mut builder = ObjectGraphSectionBuilder::new("publication_points");
builder.items(
publication_points.len(),
std::mem::size_of::<crate::audit::PublicationPointAudit>(),
);
builder.metric("publication_point_count", publication_points.len() as u64);
let mut object_count = 0u64;
let mut pp_warning_count = 0u64;
let mut pp_discovered_from_count = 0u64;
let mut object_detail_count = 0u64;
for pp in publication_points {
builder.owned_string(&pp.rsync_base_uri);
builder.owned_string(&pp.manifest_rsync_uri);
builder.owned_string(&pp.publication_point_rsync_uri);
builder.optional_string(pp.rrdp_notification_uri.as_ref());
builder.owned_string(&pp.source);
builder.optional_string(pp.repo_sync_source.as_ref());
builder.optional_string(pp.repo_sync_phase.as_ref());
builder.optional_string(pp.repo_sync_error.as_ref());
builder.owned_string(&pp.repo_terminal_state);
builder.owned_string(&pp.this_update_rfc3339_utc);
builder.owned_string(&pp.next_update_rfc3339_utc);
builder.owned_string(&pp.verified_at_rfc3339_utc);
if let Some(discovered_from) = &pp.discovered_from {
pp_discovered_from_count += 1;
builder.heap_bytes(std::mem::size_of::<crate::audit::DiscoveredFrom>());
builder.owned_string(&discovered_from.parent_manifest_rsync_uri);
builder.owned_string(&discovered_from.child_ca_certificate_rsync_uri);
builder.owned_string(&discovered_from.child_ca_certificate_sha256_hex);
}
pp_warning_count += pp.warnings.len() as u64;
builder.vec_header_with_capacity(
pp.warnings.len(),
pp.warnings.capacity(),
std::mem::size_of::<crate::audit::AuditWarning>(),
);
for warning in &pp.warnings {
builder.owned_string(&warning.message);
builder.string_vec_owned(&warning.rfc_refs);
builder.optional_string(warning.context.as_ref());
}
object_count += pp.objects.len() as u64;
builder.vec_header_with_capacity(
pp.objects.len(),
pp.objects.capacity(),
std::mem::size_of::<crate::audit::ObjectAuditEntry>(),
);
for object in &pp.objects {
builder.owned_string(&object.rsync_uri);
builder.owned_string(&object.sha256_hex);
if object.detail.is_some() {
object_detail_count += 1;
}
builder.optional_string(object.detail.as_ref());
}
}
builder.metric("object_audit_entry_count", object_count);
builder.metric("publication_point_warning_count", pp_warning_count);
builder.metric(
"publication_point_discovered_from_count",
pp_discovered_from_count,
);
builder.metric("object_detail_count", object_detail_count);
builder.finish()
}
fn estimate_vrps_graph(vrps: &[crate::validation::objects::Vrp]) -> ObjectGraphMemorySection {
let mut builder = ObjectGraphSectionBuilder::new("vrps");
builder.items(
vrps.len(),
std::mem::size_of::<crate::validation::objects::Vrp>(),
);
builder.metric("vrp_count", vrps.len() as u64);
builder.finish()
}
fn estimate_aspas_graph(
aspas: &[crate::validation::objects::AspaAttestation],
) -> ObjectGraphMemorySection {
let mut builder = ObjectGraphSectionBuilder::new("aspas");
builder.items(
aspas.len(),
std::mem::size_of::<crate::validation::objects::AspaAttestation>(),
);
let mut providers_total = 0u64;
for aspa in aspas {
providers_total += aspa.provider_as_ids.len() as u64;
builder.vec_header_with_capacity(
aspa.provider_as_ids.len(),
aspa.provider_as_ids.capacity(),
std::mem::size_of::<u32>(),
);
}
builder.metric("aspa_count", aspas.len() as u64);
builder.metric("provider_asn_count", providers_total);
builder.finish()
}
fn estimate_router_keys_graph(
router_keys: &[crate::validation::objects::RouterKeyPayload],
) -> ObjectGraphMemorySection {
let mut builder = ObjectGraphSectionBuilder::new("router_keys");
builder.items(
router_keys.len(),
std::mem::size_of::<crate::validation::objects::RouterKeyPayload>(),
);
for router_key in router_keys {
builder.byte_vec_owned(&router_key.ski);
builder.byte_vec_owned(&router_key.spki_der);
builder.owned_string(&router_key.source_object_uri);
builder.owned_string(&router_key.source_object_hash);
builder.owned_string(&router_key.source_ee_cert_hash);
}
builder.metric("router_key_count", router_keys.len() as u64);
builder.finish()
}
fn estimate_warnings_graph(
name: &str,
warnings: &[crate::report::Warning],
) -> ObjectGraphMemorySection {
let mut builder = ObjectGraphSectionBuilder::new(name);
builder.items(
warnings.len(),
std::mem::size_of::<crate::report::Warning>(),
);
for warning in warnings {
builder.owned_string(&warning.message);
builder.vec_header_with_capacity(
warning.rfc_refs.len(),
warning.rfc_refs.capacity(),
std::mem::size_of::<crate::report::RfcRef>(),
);
builder.optional_string(warning.context.as_ref());
}
builder.metric("warning_count", warnings.len() as u64);
builder.finish()
}
fn estimate_downloads_graph(
downloads: &[crate::audit::AuditDownloadEvent],
) -> ObjectGraphMemorySection {
let mut builder = ObjectGraphSectionBuilder::new("downloads");
builder.items(
downloads.len(),
std::mem::size_of::<crate::audit::AuditDownloadEvent>(),
);
let mut error_count = 0u64;
let mut bytes_count = 0u64;
let mut objects_stat_count = 0u64;
for event in downloads {
builder.owned_string(&event.uri);
builder.owned_string(&event.started_at_rfc3339_utc);
builder.owned_string(&event.finished_at_rfc3339_utc);
if event.error.is_some() {
error_count += 1;
}
if event.bytes.is_some() {
bytes_count += 1;
}
if event.objects.is_some() {
objects_stat_count += 1;
}
builder.optional_string(event.error.as_ref());
}
builder.metric("download_event_count", downloads.len() as u64);
builder.metric("download_error_count", error_count);
builder.metric("download_bytes_field_count", bytes_count);
builder.metric("download_objects_stat_count", objects_stat_count);
builder.finish()
}
fn estimate_current_repo_objects_graph(
objects: &[crate::current_repo_index::CurrentRepoObject],
) -> ObjectGraphMemorySection {
let mut builder = ObjectGraphSectionBuilder::new("current_repo_objects");
builder.items(
objects.len(),
std::mem::size_of::<crate::current_repo_index::CurrentRepoObject>(),
);
let mut object_type_count = 0u64;
for object in objects {
builder.owned_string(&object.rsync_uri);
builder.owned_string(&object.current_hash_hex);
builder.owned_string(&object.repository_source);
if object.object_type.is_some() {
object_type_count += 1;
}
builder.optional_string(object.object_type.as_ref());
}
builder.metric("current_repo_object_count", objects.len() as u64);
builder.metric("current_repo_object_type_count", object_type_count);
builder.finish()
}
fn estimate_trust_anchor_graph(shared: &PostValidationShared) -> ObjectGraphMemorySection {
let mut builder = ObjectGraphSectionBuilder::new("trust_anchors_and_tal_inputs");
builder.items(
1,
std::mem::size_of::<crate::validation::from_tal::DiscoveredRootCaInstance>(),
);
estimate_discovered_root(&mut builder, &shared.discovery);
builder.items(
shared.discoveries.len(),
std::mem::size_of::<crate::validation::from_tal::DiscoveredRootCaInstance>(),
);
for discovery in shared.discoveries.iter() {
estimate_discovered_root(&mut builder, discovery);
}
builder.items(
shared.successful_tal_inputs.len(),
std::mem::size_of::<TalInputSpec>(),
);
for tal_input in shared.successful_tal_inputs.iter() {
estimate_tal_input(&mut builder, tal_input);
}
builder.metric("discoveries_count", shared.discoveries.len() as u64);
builder.metric(
"successful_tal_inputs_count",
shared.successful_tal_inputs.len() as u64,
);
builder.finish()
}
fn estimate_discovered_root(
builder: &mut ObjectGraphSectionBuilder,
discovery: &crate::validation::from_tal::DiscoveredRootCaInstance,
) {
builder.optional_string(discovery.tal_url.as_ref());
estimate_trust_anchor(builder, &discovery.trust_anchor);
builder.owned_string(&discovery.ca_instance.rsync_base_uri);
builder.owned_string(&discovery.ca_instance.manifest_rsync_uri);
builder.owned_string(&discovery.ca_instance.publication_point_rsync_uri);
builder.optional_string(discovery.ca_instance.rrdp_notification_uri.as_ref());
}
fn estimate_trust_anchor(
builder: &mut ObjectGraphSectionBuilder,
trust_anchor: &crate::data_model::ta::TrustAnchor,
) {
builder.byte_vec_owned(&trust_anchor.tal.raw);
builder.string_vec_owned(&trust_anchor.tal.comments);
builder.vec_header_with_capacity(
trust_anchor.tal.ta_uris.len(),
trust_anchor.tal.ta_uris.capacity(),
std::mem::size_of::<url::Url>(),
);
for uri in &trust_anchor.tal.ta_uris {
builder.string(uri.as_str());
}
builder.byte_vec_owned(&trust_anchor.tal.subject_public_key_info_der);
builder.byte_vec_owned(&trust_anchor.ta_certificate.raw_der);
if let Some(uri) = &trust_anchor.resolved_ta_uri {
builder.string(uri.as_str());
}
}
fn estimate_tal_input(builder: &mut ObjectGraphSectionBuilder, tal_input: &TalInputSpec) {
builder.owned_string(&tal_input.tal_id);
builder.owned_string(&tal_input.rir_id);
match &tal_input.source {
crate::parallel::types::TalSource::Url(url) => builder.owned_string(url),
crate::parallel::types::TalSource::DerBytes {
tal_url,
tal_bytes,
ta_der,
} => {
builder.owned_string(tal_url);
builder.byte_vec_owned(tal_bytes);
builder.byte_vec_owned(ta_der);
}
crate::parallel::types::TalSource::FilePath(path) => {
builder.string(&path.to_string_lossy());
}
crate::parallel::types::TalSource::FilePathWithTa { tal_path, ta_path } => {
builder.string(&tal_path.to_string_lossy());
builder.string(&ta_path.to_string_lossy());
}
}
}
fn estimate_ccr_accumulator_graph(
accumulator: Option<&CcrAccumulator>,
) -> ObjectGraphMemorySection {
let mut builder = ObjectGraphSectionBuilder::new("ccr_accumulator");
if let Some(accumulator) = accumulator {
builder.items(1, std::mem::size_of::<CcrAccumulator>());
let stats = accumulator.memory_stats();
builder.heap_bytes(stats.estimated_heap_bytes as usize);
builder.metric("trust_anchor_count", stats.trust_anchor_count);
builder.metric("manifest_count", stats.manifest_count);
builder.metric("string_bytes", stats.string_bytes);
builder.metric("string_capacity_bytes", stats.string_capacity_bytes);
builder.metric("vec_payload_bytes", stats.vec_payload_bytes);
builder.metric("vec_capacity_bytes", stats.vec_capacity_bytes);
builder.metric("locations_der_count", stats.locations_der_count);
builder.metric("subordinate_ski_count", stats.subordinate_ski_count);
builder.metric("btree_key_capacity_bytes", stats.btree_key_capacity_bytes);
builder.metric("btree_entry_shallow_bytes", stats.btree_entry_shallow_bytes);
} else {
builder.metric("manifest_count", 0);
}
builder.finish()
}
#[cfg(test)]
fn build_report(
policy: &Policy,
validation_time: time::OffsetDateTime,
shared: &PostValidationShared,
) -> AuditReportV2 {
use time::format_description::well_known::Rfc3339;
let validation_time_rfc3339_utc = validation_time
.to_offset(time::UtcOffset::UTC)
.format(&Rfc3339)
.expect("format validation_time");
let vrps = shared
.vrps
.iter()
.map(|v| VrpOutput {
asn: v.asn,
prefix: format_roa_ip_prefix(&v.prefix),
max_length: v.max_length,
})
.collect::<Vec<_>>();
let aspas = shared
.aspas
.iter()
.map(|a| AspaOutput {
customer_as_id: a.customer_as_id,
provider_as_ids: a.provider_as_ids.clone(),
})
.collect::<Vec<_>>();
let repo_sync_stats = build_repo_sync_stats(shared.publication_points.as_ref());
AuditReportV2 {
format_version: 2,
meta: AuditRunMeta {
validation_time_rfc3339_utc,
},
policy: policy.clone(),
tree: TreeSummary {
instances_processed: shared.instances_processed,
instances_failed: shared.instances_failed,
warnings: shared
.tree_warnings
.iter()
.map(AuditWarning::from)
.collect(),
},
publication_points: shared.publication_points.iter().cloned().collect(),
vrps,
aspas,
downloads: shared.downloads.iter().cloned().collect(),
download_stats: shared.download_stats.clone(),
repo_sync_stats,
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct ReportTaskOutput {
report_build_ms: u64,
report_write_ms: Option<u64>,
}
impl ReportTaskOutput {
fn skipped() -> Self {
Self {
report_build_ms: 0,
report_write_ms: None,
}
}
}
fn run_report_task(
policy: &Policy,
validation_time: time::OffsetDateTime,
shared: &PostValidationShared,
report_json_path: Option<&Path>,
report_json_format: ReportJsonFormat,
) -> Result<ReportTaskOutput, String> {
if let Some(path) = report_json_path {
let timing = write_report_json_from_shared(
path,
policy,
validation_time,
shared,
report_json_format,
)?;
Ok(ReportTaskOutput {
report_build_ms: timing.build_ms,
report_write_ms: Some(timing.write_ms),
})
} else {
Ok(ReportTaskOutput::skipped())
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct CcrTaskOutput {
ccr_build_ms: Option<u64>,
ccr_build_breakdown: Option<CcrBuildBreakdown>,
ccr_write_ms: Option<u64>,
}
fn run_ccr_task(
store: &RocksStore,
shared: &PostValidationShared,
ccr_out_path: Option<&Path>,
produced_at: time::OffsetDateTime,
) -> Result<CcrTaskOutput, String> {
let mut ccr_build_ms = None;
let mut ccr_build_breakdown = None;
let mut ccr_write_ms = None;
if let Some(path) = ccr_out_path {
let started = std::time::Instant::now();
let (ccr, build_breakdown) = if let Some(accumulator) = shared.ccr_accumulator.as_ref() {
(
accumulator
.finish(
produced_at,
shared.vrps.as_ref(),
shared.aspas.as_ref(),
shared.router_keys.as_ref(),
)
.map_err(|e| e.to_string())?,
None,
)
} else {
let trust_anchors = shared.trust_anchors();
let (ccr, build_breakdown) = build_ccr_from_run_with_breakdown(
store,
&trust_anchors,
shared.vrps.as_ref(),
shared.aspas.as_ref(),
shared.router_keys.as_ref(),
produced_at,
)
.map_err(|e| e.to_string())?;
(ccr, Some(build_breakdown))
};
ccr_build_ms = Some(started.elapsed().as_millis() as u64);
ccr_build_breakdown = build_breakdown;
let started = std::time::Instant::now();
write_ccr_file(path, &ccr).map_err(|e| e.to_string())?;
ccr_write_ms = Some(started.elapsed().as_millis() as u64);
eprintln!("wrote CCR: {}", path.display());
}
Ok(CcrTaskOutput {
ccr_build_ms,
ccr_build_breakdown,
ccr_write_ms,
})
}
fn resolve_cir_export_tal_uris(args: &CliArgs) -> Result<Vec<String>, String> {
if !args.cir_tal_uris.is_empty() {
return Ok(args.cir_tal_uris.clone());
}
if !args.tal_urls.is_empty() {
return Ok(args.tal_urls.clone());
}
Err("CIR export requires TAL URI source(s)".to_string())
}
fn effective_cir_tal_uris_for_discoveries(
args: &CliArgs,
shared: &PostValidationShared,
cir_tal_uris: Vec<String>,
) -> Result<Vec<String>, String> {
if shared.successful_tal_inputs.is_empty() {
return Ok(cir_tal_uris);
}
if cir_tal_uris.len() == shared.discoveries.len() {
return Ok(cir_tal_uris);
}
if cir_tal_uris.len() != args.tal_inputs.len() {
return Ok(cir_tal_uris);
}
let mut mapped = Vec::with_capacity(shared.successful_tal_inputs.len());
for successful in shared.successful_tal_inputs.iter() {
let input_index = args
.tal_inputs
.iter()
.position(|candidate| candidate == successful)
.ok_or_else(|| {
format!(
"successful TAL '{}' was not found in original TAL input list",
successful.tal_id
)
})?;
mapped.push(cir_tal_uris[input_index].clone());
}
Ok(mapped)
}
fn build_repo_sync_stats(
publication_points: &[crate::audit::PublicationPointAudit],
) -> AuditRepoSyncStats {
let mut stats = AuditRepoSyncStats {
publication_points_total: publication_points.len() as u64,
..AuditRepoSyncStats::default()
};
for pp in publication_points {
let duration = pp.repo_sync_duration_ms.unwrap_or(0);
if let Some(phase) = pp.repo_sync_phase.as_ref() {
let entry = stats.by_phase.entry(phase.clone()).or_default();
entry.count += 1;
entry.duration_ms_total += duration;
}
let entry = stats
.by_terminal_state
.entry(pp.repo_terminal_state.clone())
.or_default();
entry.count += 1;
entry.duration_ms_total += duration;
}
stats
}
fn run_online_validation_with_fetchers<H, R>(
store: Arc<RocksStore>,
policy: &Policy,
args: &CliArgs,
http: &H,
rsync: &R,
validation_time: time::OffsetDateTime,
config: &TreeRunConfig,
collect_current_repo_objects: bool,
timing: Option<&TimingHandle>,
) -> Result<RunTreeFromTalAuditOutput, String>
where
H: crate::sync::rrdp::Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{
if args.tal_inputs.len() > 1 {
return if let Some(t) = timing {
run_tree_from_multiple_tals_parallel_phase2_audit_with_timing(
store,
policy,
args.tal_inputs.clone(),
http,
rsync,
validation_time,
config,
args.parallel_phase1_config.clone(),
args.parallel_phase2_config.clone(),
collect_current_repo_objects,
t,
)
} else {
run_tree_from_multiple_tals_parallel_phase2_audit(
store,
policy,
args.tal_inputs.clone(),
http,
rsync,
validation_time,
config,
args.parallel_phase1_config.clone(),
args.parallel_phase2_config.clone(),
collect_current_repo_objects,
)
}
.map_err(|e| e.to_string());
}
match (
args.tal_url.as_ref(),
args.tal_path.as_ref(),
args.ta_path.as_ref(),
) {
(Some(url), _, _) => if let Some(t) = timing {
run_tree_from_tal_url_parallel_phase2_audit_with_timing(
store,
policy,
url,
http,
rsync,
validation_time,
config,
args.parallel_phase1_config.clone(),
args.parallel_phase2_config.clone(),
collect_current_repo_objects,
t,
)
} else {
run_tree_from_tal_url_parallel_phase2_audit(
store,
policy,
url,
http,
rsync,
validation_time,
config,
args.parallel_phase1_config.clone(),
args.parallel_phase2_config.clone(),
collect_current_repo_objects,
)
}
.map_err(|e| e.to_string()),
(None, Some(tal_path), Some(ta_path)) => {
let tal_bytes = std::fs::read(tal_path)
.map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?;
let ta_der = std::fs::read(ta_path)
.map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?;
if let Some(t) = timing {
run_tree_from_tal_and_ta_der_parallel_phase2_audit_with_timing(
store,
policy,
&tal_bytes,
&ta_der,
None,
http,
rsync,
validation_time,
config,
args.parallel_phase1_config.clone(),
args.parallel_phase2_config.clone(),
collect_current_repo_objects,
t,
)
} else {
run_tree_from_tal_and_ta_der_parallel_phase2_audit(
store,
policy,
&tal_bytes,
&ta_der,
None,
http,
rsync,
validation_time,
config,
args.parallel_phase1_config.clone(),
args.parallel_phase2_config.clone(),
collect_current_repo_objects,
)
}
.map_err(|e| e.to_string())
}
(None, Some(tal_path), None) => {
let tal_bytes = std::fs::read(tal_path)
.map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?;
let tal_uri = args.cir_tal_uri.clone();
if let Some(t) = timing {
crate::validation::run_tree_from_tal::run_tree_from_tal_bytes_serial_audit_with_timing(
store.as_ref(),
policy,
&tal_bytes,
tal_uri,
http,
rsync,
validation_time,
config,
t,
)
.map_err(|e| e.to_string())
} else {
crate::validation::run_tree_from_tal::run_tree_from_tal_bytes_serial_audit(
store.as_ref(),
policy,
&tal_bytes,
tal_uri,
http,
rsync,
validation_time,
config,
)
.map_err(|e| e.to_string())
}
}
_ => unreachable!("validated by parse_args"),
}
}
pub fn run(argv: &[String]) -> Result<(), String> {
let args = parse_args(argv)?;
let mut policy = read_policy(args.policy_path.as_deref())?;
if let Some(strict_policy) = args.strict_policy {
policy.strict = strict_policy;
}
if args.disable_rrdp {
policy.sync_preference = crate::policy::SyncPreference::RsyncOnly;
}
let validation_time = args
.validation_time
.unwrap_or_else(time::OffsetDateTime::now_utc);
let store = if args.raw_store_db.is_some() || args.repo_bytes_db.is_some() {
Arc::new(
RocksStore::open_with_external_stores(
&args.db_path,
args.raw_store_db.as_deref(),
args.repo_bytes_db.as_deref(),
)
.map_err(|e| e.to_string())?,
)
} else {
Arc::new(RocksStore::open(&args.db_path).map_err(|e| e.to_string())?)
};
let config = TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: args.skip_report_build
&& args.report_json_path.is_none()
&& !args.cir_enabled,
persist_vcir: !args.skip_vcir_persist,
build_ccr_accumulator: args.ccr_out_path.is_some(),
enable_roa_validation_cache: args.enable_roa_validation_cache,
enable_transport_request_prefetch: args.enable_transport_request_prefetch,
};
let replay_mode = args.payload_replay_archive.is_some();
let delta_replay_mode = args.payload_base_archive.is_some();
use time::format_description::well_known::Rfc3339;
let mut timing: Option<(std::path::PathBuf, TimingHandle)> = None;
if args.analyze {
let recorded_at_utc_rfc3339 = time::OffsetDateTime::now_utc()
.to_offset(time::UtcOffset::UTC)
.format(&Rfc3339)
.map_err(|e| format!("format recorded_at_utc failed: {e}"))?;
let validation_time_utc_rfc3339 = validation_time
.to_offset(time::UtcOffset::UTC)
.format(&Rfc3339)
.map_err(|e| format!("format validation_time failed: {e}"))?;
let ts_compact = {
let fmt = time::format_description::parse("[year][month][day]T[hour][minute][second]Z")
.map_err(|e| format!("format description parse failed: {e}"))?;
time::OffsetDateTime::now_utc()
.format(&fmt)
.map_err(|e| format!("format timestamp failed: {e}"))?
};
let out_dir = args.analysis_out_path.clone().unwrap_or_else(|| {
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("target")
.join("live")
.join("analyze")
.join(ts_compact)
});
std::fs::create_dir_all(&out_dir)
.map_err(|e| format!("create analyze out dir failed: {}: {e}", out_dir.display()))?;
let handle = TimingHandle::new(TimingMeta {
recorded_at_utc_rfc3339,
validation_time_utc_rfc3339,
tal_url: None,
db_path: None,
});
handle.set_meta(TimingMetaUpdate {
tal_url: args.tal_url.as_deref(),
db_path: Some(args.db_path.to_string_lossy().as_ref()),
});
timing = Some((out_dir, handle));
}
if args.profile_cpu && !args.analyze {
return Err("--profile-cpu requires --analyze".to_string());
}
#[cfg(not(feature = "profile"))]
if args.profile_cpu {
return Err("CPU profiling requires building with: --features profile".to_string());
}
#[cfg(feature = "profile")]
let mut profiler_guard: Option<pprof::ProfilerGuard<'static>> = if args.profile_cpu {
Some(
pprof::ProfilerGuard::new(100)
.map_err(|e| format!("pprof ProfilerGuard init failed: {e}"))?,
)
} else {
None
};
let total_started = std::time::Instant::now();
let mut memory_checkpoints: Vec<MemoryTelemetryCheckpoint> = Vec::new();
let mut malloc_trim_probes: Vec<MallocTrimProbe> = Vec::new();
let enable_memory_trim_probe = memory_trim_probe_enabled() || args.memory_trim_after_validation;
record_memory_checkpoint(
&mut memory_checkpoints,
"after_store_open",
&total_started,
store.as_ref(),
);
let validation_started = std::time::Instant::now();
let collect_current_repo_objects = false;
let out = if delta_replay_mode {
let tal_path = args
.tal_path
.as_ref()
.expect("validated by parse_args for delta replay mode");
let ta_path = args
.ta_path
.as_ref()
.expect("validated by parse_args for delta replay mode");
let base_archive = args
.payload_base_archive
.as_ref()
.expect("validated by parse_args for delta replay mode");
let base_locks = args
.payload_base_locks
.as_ref()
.expect("validated by parse_args for delta replay mode");
let base_validation_time = args.payload_base_validation_time.unwrap_or(validation_time);
let delta_archive = args
.payload_delta_archive
.as_ref()
.expect("validated by parse_args for delta replay mode");
let delta_locks = args
.payload_delta_locks
.as_ref()
.expect("validated by parse_args for delta replay mode");
let tal_bytes = std::fs::read(tal_path)
.map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?;
let ta_der = std::fs::read(ta_path)
.map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?;
if let Some((_, t)) = timing.as_ref() {
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit_with_timing(
store.as_ref(),
&policy,
&tal_bytes,
&ta_der,
None,
base_archive,
base_locks,
delta_archive,
delta_locks,
base_validation_time,
validation_time,
&config,
t,
)
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit(
store.as_ref(),
&policy,
&tal_bytes,
&ta_der,
None,
base_archive,
base_locks,
delta_archive,
delta_locks,
base_validation_time,
validation_time,
&config,
)
.map_err(|e| e.to_string())?
}
} else if replay_mode {
let tal_path = args
.tal_path
.as_ref()
.expect("validated by parse_args for replay mode");
let ta_path = args
.ta_path
.as_ref()
.expect("validated by parse_args for replay mode");
let archive_root = args
.payload_replay_archive
.as_ref()
.expect("validated by parse_args for replay mode");
let locks_path = args
.payload_replay_locks
.as_ref()
.expect("validated by parse_args for replay mode");
let tal_bytes = std::fs::read(tal_path)
.map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?;
let ta_der = std::fs::read(ta_path)
.map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?;
if let Some((_, t)) = timing.as_ref() {
run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing(
store.as_ref(),
&policy,
&tal_bytes,
&ta_der,
None,
archive_root,
locks_path,
validation_time,
&config,
t,
)
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
store.as_ref(),
&policy,
&tal_bytes,
&ta_der,
None,
archive_root,
locks_path,
validation_time,
&config,
)
.map_err(|e| e.to_string())?
}
} else if let Some(dir) = args.rsync_local_dir.as_ref() {
let http = BlockingHttpFetcher::new(HttpFetcherConfig {
timeout: std::time::Duration::from_secs(args.http_timeout_secs.max(1)),
..HttpFetcherConfig::default()
})
.map_err(|e| e.to_string())?;
let rsync = LocalDirRsyncFetcher::new(dir);
run_online_validation_with_fetchers(
Arc::clone(&store),
&policy,
&args,
&http,
&rsync,
validation_time,
&config,
collect_current_repo_objects,
timing.as_ref().map(|(_, t)| t),
)?
} else {
let http = BlockingHttpFetcher::new(HttpFetcherConfig {
timeout: std::time::Duration::from_secs(args.http_timeout_secs.max(1)),
..HttpFetcherConfig::default()
})
.map_err(|e| e.to_string())?;
let rsync = SystemRsyncFetcher::new(SystemRsyncConfig {
rsync_bin: args
.rsync_command
.clone()
.unwrap_or_else(|| PathBuf::from("rsync")),
timeout: std::time::Duration::from_secs(args.rsync_timeout_secs.max(1)),
mirror_root: args.rsync_mirror_root.clone(),
scope_policy: args.rsync_scope_policy,
..SystemRsyncConfig::default()
});
run_online_validation_with_fetchers(
Arc::clone(&store),
&policy,
&args,
&http,
&rsync,
validation_time,
&config,
collect_current_repo_objects,
timing.as_ref().map(|(_, t)| t),
)?
};
let validation_ms = validation_started.elapsed().as_millis() as u64;
let shared = PostValidationShared::from_run_output(out);
let vcir_storage_summary_enabled = vcir_storage_summary_enabled();
let vcir_storage_summary_started = std::time::Instant::now();
let vcir_storage = if config.persist_vcir && vcir_storage_summary_enabled {
Some(
store
.summarize_vcir_storage()
.map_err(|e| format!("summarize VCIR storage failed: {e}"))?,
)
} else {
None
};
let vcir_storage_summary_ms = (config.persist_vcir && vcir_storage_summary_enabled)
.then(|| vcir_storage_summary_started.elapsed().as_millis() as u64);
record_memory_checkpoint(
&mut memory_checkpoints,
"after_validation",
&total_started,
store.as_ref(),
);
if enable_memory_trim_probe {
malloc_trim_probes.push(crate::memory_telemetry::malloc_trim_probe());
record_memory_checkpoint(
&mut memory_checkpoints,
"after_validation_malloc_trim",
&total_started,
store.as_ref(),
);
}
if let Some((_out_dir, t)) = timing.as_ref() {
t.record_count("instances_processed", shared.instances_processed as u64);
t.record_count("instances_failed", shared.instances_failed as u64);
}
let publication_points = shared.publication_points.len();
let publication_point_repo_sync_ms_total: u64 = shared
.publication_points
.iter()
.map(|pp| pp.repo_sync_duration_ms.unwrap_or(0))
.sum();
let download_event_count = shared.download_stats.events_total;
let rrdp_download_ms_total: u64 = ["rrdp_notification", "rrdp_snapshot", "rrdp_delta"]
.iter()
.map(|key| {
shared
.download_stats
.by_kind
.get(*key)
.map(|item| item.duration_ms_total)
.unwrap_or(0)
})
.sum();
let rsync_download_ms_total = shared
.download_stats
.by_kind
.get("rsync")
.map(|item| item.duration_ms_total)
.unwrap_or(0);
let repo_sync_ms_total = rrdp_download_ms_total + rsync_download_ms_total;
let download_bytes_total: u64 = shared
.download_stats
.by_kind
.values()
.map(|item| item.bytes_total.unwrap_or(0))
.sum();
#[cfg(feature = "profile")]
let profiler_report = if let Some(guard) = profiler_guard.take() {
Some(
guard
.report()
.build()
.map_err(|e| format!("pprof report build failed: {e}"))?,
)
} else {
None
};
let report_json_format = if args.report_json_compact {
ReportJsonFormat::Compact
} else {
ReportJsonFormat::Pretty
};
let ccr_produced_at = time::OffsetDateTime::now_utc();
let (report_result, ccr_result) = if args.skip_report_build {
(
Ok(ReportTaskOutput::skipped()),
run_ccr_task(
store.as_ref(),
&shared,
args.ccr_out_path.as_deref(),
ccr_produced_at,
),
)
} else {
std::thread::scope(|scope| {
let report_handle = scope.spawn(|| {
run_report_task(
&policy,
validation_time,
&shared,
args.report_json_path.as_deref(),
report_json_format,
)
});
let ccr_handle = scope.spawn(|| {
run_ccr_task(
store.as_ref(),
&shared,
args.ccr_out_path.as_deref(),
ccr_produced_at,
)
});
let report_result = report_handle
.join()
.map_err(|_| "report task panicked".to_string())
.and_then(|result| result);
let ccr_result = ccr_handle
.join()
.map_err(|_| "ccr task panicked".to_string())
.and_then(|result| result);
(report_result, ccr_result)
})
};
let report_output = report_result?;
let ccr_output = ccr_result?;
record_memory_checkpoint(
&mut memory_checkpoints,
"after_report_and_ccr",
&total_started,
store.as_ref(),
);
if enable_memory_trim_probe {
malloc_trim_probes.push(crate::memory_telemetry::malloc_trim_probe());
record_memory_checkpoint(
&mut memory_checkpoints,
"after_report_and_ccr_malloc_trim",
&total_started,
store.as_ref(),
);
}
let report_build_ms = report_output.report_build_ms;
let report_write_ms = report_output.report_write_ms;
let ccr_build_ms = ccr_output.ccr_build_ms;
let ccr_build_breakdown = ccr_output.ccr_build_breakdown;
let ccr_write_ms = ccr_output.ccr_write_ms;
let compare_view_trust_anchor = args
.compare_view_trust_anchor
.as_deref()
.unwrap_or("unknown");
let compare_view_output = run_compare_view_task(
&shared,
args.vrps_csv_out_path.as_deref(),
args.vaps_csv_out_path.as_deref(),
compare_view_trust_anchor,
)?;
let compare_view_build_ms = compare_view_output.build_ms;
let compare_view_write_ms = compare_view_output.write_ms;
record_memory_checkpoint(
&mut memory_checkpoints,
"after_compare_view",
&total_started,
store.as_ref(),
);
let mut cir_build_cir_ms = None;
let mut cir_write_cir_ms = None;
let mut cir_total_ms = None;
if args.cir_enabled {
let cir_tal_uris = effective_cir_tal_uris_for_discoveries(
&args,
&shared,
resolve_cir_export_tal_uris(&args)?,
)?;
if cir_tal_uris.len() != shared.discoveries.len() {
return Err(format!(
"CIR export TAL URI count ({}) does not match discovery count ({})",
cir_tal_uris.len(),
shared.discoveries.len()
));
}
let cir_out_path = args
.cir_out_path
.as_deref()
.expect("validated by parse_args for cir");
let tal_bindings = shared
.discoveries
.iter()
.zip(cir_tal_uris.iter())
.map(|(discovery, tal_uri)| CirTrustAnchorBinding {
trust_anchor: &discovery.trust_anchor,
tal_uri: tal_uri.as_str(),
})
.collect::<Vec<_>>();
let summary = export_cir_from_run_multi(
store.as_ref(),
&tal_bindings,
validation_time,
shared.publication_points.as_ref(),
cir_out_path,
time::OffsetDateTime::now_utc().date(),
None,
)
.map_err(|e| e.to_string())?;
cir_build_cir_ms = Some(summary.timing.build_cir_ms);
cir_write_cir_ms = Some(summary.timing.write_cir_ms);
cir_total_ms = Some(summary.timing.total_ms);
eprintln!(
"wrote CIR: {} (objects={}, trust_anchors={}, build_cir_ms={}, write_cir_ms={}, total_ms={})",
cir_out_path.display(),
summary.object_count,
summary.trust_anchor_count,
summary.timing.build_cir_ms,
summary.timing.write_cir_ms,
summary.timing.total_ms
);
record_memory_checkpoint(
&mut memory_checkpoints,
"after_cir",
&total_started,
store.as_ref(),
);
}
record_memory_checkpoint(
&mut memory_checkpoints,
"before_stage_timing",
&total_started,
store.as_ref(),
);
let stage_timing = RunStageTiming {
validation_ms,
enable_roa_validation_cache: args.enable_roa_validation_cache,
enable_transport_request_prefetch: args.enable_transport_request_prefetch,
report_build_ms,
report_write_ms,
ccr_build_ms,
ccr_build_breakdown,
ccr_write_ms,
compare_view_build_ms,
compare_view_write_ms,
cir_build_cir_ms,
cir_write_cir_ms,
cir_total_ms,
total_ms: total_started.elapsed().as_millis() as u64,
publication_points,
repo_sync_ms_total,
publication_point_repo_sync_ms_total,
download_event_count,
rrdp_download_ms_total,
rsync_download_ms_total,
download_bytes_total,
roa_validation_cache: shared.roa_cache_stats.clone(),
analysis_counts: timing
.as_ref()
.map(|(_, handle)| handle.counts_snapshot())
.unwrap_or_default(),
vcir_storage_summary_ms,
vcir_storage,
memory_telemetry: Some(MemoryTelemetrySummary {
checkpoints: memory_checkpoints,
object_graph: Some(estimate_shared_object_graph(&shared)),
malloc_trim_probes,
}),
};
let stage_timing_anchor_path = args
.report_json_path
.as_deref()
.or(args.ccr_out_path.as_deref())
.or(args.vrps_csv_out_path.as_deref());
write_stage_timing(stage_timing_anchor_path, &stage_timing)?;
if let Some((out_dir, t)) = timing.as_ref() {
t.record_count("vrps", shared.vrps.len() as u64);
t.record_count("aspas", shared.aspas.len() as u64);
t.record_count(
"audit_publication_points",
shared.publication_points.len() as u64,
);
let timing_json_path = out_dir.join("timing.json");
t.write_json(&timing_json_path, 20)?;
eprintln!("analysis: wrote {}", timing_json_path.display());
}
#[cfg(feature = "profile")]
if let (Some((out_dir, _)), Some(report)) = (timing.as_ref(), profiler_report) {
let svg_path = out_dir.join("flamegraph.svg");
let svg_file = std::fs::File::create(&svg_path)
.map_err(|e| format!("create flamegraph failed: {}: {e}", svg_path.display()))?;
report
.flamegraph(svg_file)
.map_err(|e| format!("write flamegraph failed: {e}"))?;
eprintln!("analysis: wrote {}", svg_path.display());
let pb_path = out_dir.join("pprof.pb.gz");
let pprof_profile = report
.pprof()
.map_err(|e| format!("pprof export failed: {e}"))?;
use pprof::protos::Message;
let mut body = Vec::with_capacity(pprof_profile.encoded_len());
pprof_profile
.encode(&mut body)
.map_err(|e| format!("pprof encode failed: {e}"))?;
let gz = flate2::write::GzEncoder::new(
std::fs::File::create(&pb_path)
.map_err(|e| format!("create pprof.pb.gz failed: {}: {e}", pb_path.display()))?,
flate2::Compression::default(),
);
let mut gz = gz;
use std::io::Write;
gz.write_all(&body)
.map_err(|e| format!("write pprof.pb.gz failed: {e}"))?;
gz.finish()
.map_err(|e| format!("finish pprof.pb.gz failed: {e}"))?;
eprintln!("analysis: wrote {}", pb_path.display());
}
print_summary_from_shared(validation_time, &shared);
Ok(())
}
#[cfg(test)]
#[path = "cli/tests.rs"]
mod tests;